Coverage for yuio / complete.py: 74%

779 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-04 10:05 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9This module provides autocompletion functionality for widgets and CLI. 

10 

11 

12Completer basics 

13---------------- 

14 

15All completers are derived from the :class:`Completer` base class 

16with a simple interface: 

17 

18.. autoclass:: Completer 

19 :members: 

20 

21.. autoclass:: Completion 

22 :members: 

23 

24 

25Completers 

26---------- 

27 

28Yuio provides basic completers that cover most of the cases: 

29 

30.. autoclass:: Empty 

31 

32.. autoclass:: Alternative 

33 

34.. autoclass:: Choice 

35 

36.. autoclass:: Option 

37 :members: 

38 

39.. autoclass:: List 

40 

41.. autoclass:: Tuple 

42 

43.. autoclass:: File 

44 

45.. autoclass:: Dir 

46 

47 

48Implementing your own completer 

49------------------------------- 

50 

51To implement a custom completer, subclass :class:`Completer` and implement 

52its :meth:`~Completer._process` method. 

53 

54.. note:: 

55 

56 When using a custom completer for CLI flags in :mod:`yuio.app`, 

57 completion script will invoke your program with special arguments 

58 to run the completer and get its result. 

59 

60.. class:: Completer 

61 :noindex: 

62 

63 .. automethod:: _process 

64 

65.. autoclass:: CompletionCollector 

66 

67""" 

68 

69from __future__ import annotations 

70 

71import abc 

72import argparse 

73import contextlib 

74import dataclasses 

75import enum 

76import functools 

77import json 

78import math 

79import os 

80import pathlib 

81import re 

82import shutil 

83import string 

84import subprocess 

85import sys 

86import typing 

87from dataclasses import dataclass 

88 

89import yuio 

90import yuio.string 

91from yuio import _typing as _t 

92 

93__all__ = [ 

94 "Alternative", 

95 "Choice", 

96 "Completer", 

97 "Completion", 

98 "CompletionCollector", 

99 "Dir", 

100 "Empty", 

101 "File", 

102 "List", 

103 "Option", 

104 "Tuple", 

105] 

106 

107 

108@dataclass(frozen=True, slots=True) 

109@functools.total_ordering 

110class Completion: 

111 """ 

112 A single completion. 

113 

114 """ 

115 

116 iprefix: str 

117 """ 

118 See :class:`CompletionCollector.iprefix` for details. 

119 

120 """ 

121 

122 completion: str 

123 """ 

124 Text of the completion. 

125 

126 """ 

127 

128 rsuffix: str 

129 """ 

130 See :class:`CompletionCollector.rsuffix` for details. 

131 

132 """ 

133 

134 rsymbols: str 

135 """ 

136 See :class:`CompletionCollector.rsymbols` for details. 

137 

138 """ 

139 

140 isuffix: str 

141 """ 

142 See :class:`CompletionCollector.isuffix` for details. 

143 

144 """ 

145 

146 comment: str | None 

147 """ 

148 Short comment displayed alongside the completion. 

149 

150 """ 

151 

152 dprefix: str 

153 """ 

154 Prefix that will be displayed before :attr:`~Completion.completion` 

155 when listing completions, but will not be inserted once completion 

156 is applied. 

157 

158 """ 

159 

160 dsuffix: str 

161 """ 

162 Like :attr:`~Completion.dprefix`, but it's a suffix. 

163 

164 """ 

165 

166 group_id: _t.SupportsLt[_t.Any] = dataclasses.field(repr=False) 

167 """ 

168 Group id, used to sort completions. 

169 

170 Actual content of this property is an implementation detail. 

171 

172 """ 

173 

174 group_color_tag: str | None 

175 """ 

176 Color tag that's used when displaying this completion. 

177 

178 See :meth:`CompletionCollector.add_group` for details. 

179 

180 """ 

181 

182 def __lt__(self, other: Completion) -> bool: 

183 """ 

184 Completions are ordered by their groups and then alphabetically. 

185 

186 """ 

187 

188 return self.group_id < other.group_id or ( 

189 self.group_id == other.group_id and self.completion < other.completion 

190 ) 

191 

192 

193@dataclass(init=False, eq=False, repr=False, match_args=False) 

194class CompletionCollector: 

195 """ 

196 A class that collects completions as completers are running. 

197 

198 The text that is being completed is split into four parts, similar 

199 to what you might see in ZSH completion widgets. The main two are: 

200 

201 .. autoattribute:: prefix 

202 

203 .. autoattribute:: suffix 

204 

205 When completions are added to the collector, they are checked against 

206 the current prefix to determine if they match the entered text. If they 

207 do, the completion system will replace text from ``prefix`` and ``suffix`` 

208 with the new completion string. 

209 

210 The two additional parts are: 

211 

212 .. autoattribute:: iprefix 

213 

214 .. autoattribute:: isuffix 

215 

216 For example, suppose you're completing a second element 

217 of a colon-separated list. The list completer will set up 

218 the collector so that ``prefix`` and ``suffix`` contain parts of the 

219 current list element, while ``iprefix`` and ``isuffix`` contain 

220 the rest of the elements: 

221 

222 .. code-block:: text 

223 

224 list_element_1:list_el|ement_2:list_element_3 

225 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘ 

226 iprefix prefix │ suffix isuffix 

227 └ cursor 

228 

229 Now, if the completer adds a completion ``"list_elements"``, 

230 this text will replace the ``prefix`` and ``suffix``, but not ``iprefix`` 

231 and ``isuffix``. So, after the completion is applied, the string will 

232 look like so: 

233 

234 .. code-block:: text 

235 

236 list_element_1:list_elements:list_element_3 

237 └┬──────────┘ 

238 this got replaced 

239 

240 Finally, there is ``rsuffix``: 

241 

242 .. autoattribute:: rsuffix 

243 

244 .. autoattribute:: rsymbols 

245 

246 So, when completing a colon-separated list, colons will be added and removed 

247 automatically, similar to how ZSH does it. 

248 

249 .. autoattribute:: dedup_words 

250 

251 .. autoattribute:: full_prefix 

252 

253 .. autoattribute:: full_suffix 

254 

255 .. autoattribute:: text 

256 

257 .. autoattribute:: num_completions 

258 

259 .. automethod:: add 

260 

261 .. automethod:: add_group 

262 

263 .. automethod:: save_state 

264 

265 .. automethod:: split_off_prefix 

266 

267 .. automethod:: split_off_suffix 

268 

269 .. automethod:: finalize 

270 

271 """ 

272 

273 iprefix: str 

274 """ 

275 Contains text that goes before the :attr:`~CompletionCollector.prefix`. 

276 

277 This prefix is not considered when checking whether a completion 

278 matches a text, and it is not replaced by the completion. It will also 

279 not be shown in the table of completions. 

280 

281 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix` 

282 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into 

283 list elements. 

284 

285 """ 

286 

287 prefix: str 

288 """ 

289 Portion of the completed text before the cursor. 

290 

291 """ 

292 

293 suffix: str 

294 """ 

295 Portion of the completed text after the cursor. 

296 

297 """ 

298 

299 rsuffix: str 

300 """ 

301 Starts empty, and may be set to hold a list separator. 

302 

303 This suffix will be added after the completion. However, it will be automatically 

304 removed if the user types one of :attr:`CompletionCollector.rsymbols`, 

305 or moves cursor, or alters input in some other way. 

306 

307 This property is mutable and can be changed by completers. 

308 

309 """ 

310 

311 rsymbols: str 

312 """ 

313 If user types one of the symbols from this string, 

314 :attr:`~.CompletionCollector.rsuffix` will be removed. 

315 

316 This property is mutable and can be changed by completers. 

317 

318 """ 

319 

320 isuffix: str 

321 """ 

322 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes. 

323 

324 """ 

325 

326 dedup_words: frozenset[str] 

327 """ 

328 Completions from this set will not be added. This is useful 

329 when completing lists of unique values. 

330 

331 This property is mutable and can be changed by completers. 

332 

333 """ 

334 

335 # Internal fields. 

336 _group_id: int 

337 _group_sorted: bool 

338 _group_color_tag: str | None 

339 

340 def __init__(self, text: str, pos: int, /): 

341 self.iprefix = "" 

342 self.prefix = text[:pos] 

343 self.suffix = text[pos:] 

344 self.rsuffix = "" 

345 self.rsymbols = "" 

346 self.isuffix = "" 

347 self.dedup_words = frozenset() 

348 

349 self._group_id = 0 

350 self._group_sorted = True 

351 self._group_color_tag = None 

352 

353 self._completions: list[Completion] = [] 

354 

355 @property 

356 def full_prefix(self) -> str: 

357 """ 

358 Portion of the final completed text that goes before the cursor. 

359 

360 """ 

361 

362 return self.iprefix + self.prefix 

363 

364 @property 

365 def full_suffix(self) -> str: 

366 """ 

367 Portion of the final completed text that goes after the cursor. 

368 

369 """ 

370 

371 return self.suffix + self.isuffix 

372 

373 @property 

374 def text(self) -> str: 

375 """ 

376 Portion of the text that is being autocompleted. 

377 

378 """ 

379 

380 return self.prefix + self.suffix 

381 

382 @contextlib.contextmanager 

383 def save_state(self): 

384 """ 

385 Save current state of the collector, i.e. prefixes, 

386 suffixes, etc., upon entering this context manager, 

387 then restore state upon exiting. 

388 

389 Use this context manager when you need to call nested 

390 completers more than once to prevent changes made in 

391 one nested completer bleeding out into another 

392 nested completer. 

393 

394 """ 

395 

396 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)} 

397 

398 try: 

399 yield 

400 finally: 

401 for name, value in state.items(): 

402 setattr(self, name, value) 

403 

404 def add( 

405 self, 

406 completion: str, 

407 /, 

408 *, 

409 comment: str | None = None, 

410 dprefix: str = "", 

411 dsuffix: str = "", 

412 color_tag: str | None = None, 

413 ): 

414 """ 

415 Add a new completion. 

416 

417 :param completion: 

418 completed text without :attr:`~CompletionCollector.iprefix` 

419 and :attr:`~CompletionCollector.isuffix`. This text will replace 

420 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`. 

421 :param comment: 

422 additional comment that will be displayed near the completion. 

423 :param color_tag: 

424 allows overriding color tag from the group. 

425 

426 """ 

427 

428 if ( 

429 completion 

430 and completion not in self.dedup_words 

431 and completion.startswith(self.prefix) 

432 ): 

433 self._add( 

434 completion, 

435 comment=comment, 

436 dprefix=dprefix, 

437 dsuffix=dsuffix, 

438 color_tag=color_tag, 

439 ) 

440 

441 def _add( 

442 self, 

443 completion: str, 

444 /, 

445 *, 

446 comment: str | None = None, 

447 dprefix: str = "", 

448 dsuffix: str = "", 

449 color_tag: str | None = None, 

450 ): 

451 if not self.isuffix or self.isuffix[0] in string.whitespace: 

452 # Only add `rsuffix` if we're at the end of an array element. 

453 # Don't add `rsuffix` if we're in the middle of an array, unless the array 

454 # is separated by spaces. 

455 rsuffix = self.rsuffix 

456 rsymbols = self.rsymbols 

457 else: 

458 rsuffix = "" 

459 rsymbols = "" 

460 

461 if self._group_sorted: 

462 group_id = (self._group_id, 0) 

463 else: 

464 group_id = (self._group_id, len(self._completions)) 

465 

466 if color_tag is None: 

467 color_tag = self._group_color_tag 

468 

469 self._completions.append( 

470 Completion( 

471 iprefix=self.iprefix, 

472 completion=completion, 

473 rsuffix=rsuffix, 

474 rsymbols=rsymbols, 

475 isuffix=self.isuffix, 

476 comment=comment, 

477 dprefix=dprefix, 

478 dsuffix=dsuffix, 

479 group_id=group_id, 

480 group_color_tag=color_tag, 

481 ) 

482 ) 

483 

484 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None): 

485 """ 

486 Add a new completions group. 

487 

488 All completions added after call to this method will be placed to the new group. 

489 They will be grouped together, and colored according to the group's color tag. 

490 

491 :param sorted: 

492 controls whether completions in the new group 

493 should be sorted lexicographically. 

494 :param color_tag: 

495 which color tag should be used to display completions 

496 and their help messages for this group. 

497 

498 See :attr:`yuio.widget.Option.color_tag` for details. 

499 

500 """ 

501 

502 self._group_id += 1 

503 self._group_sorted = sorted 

504 self._group_color_tag = color_tag 

505 

506 @property 

507 def num_completions(self) -> int: 

508 """ 

509 Number of completions added so far. 

510 

511 """ 

512 

513 return len(self._completions) 

514 

515 def split_off_prefix(self, delim: str | None = None, /): 

516 """ 

517 Move everything up to the last occurrence of ``delim`` 

518 from :attr:`~CompletionCollector.prefix` 

519 to :attr:`~CompletionCollector.iprefix`. 

520 

521 :param delim: 

522 delimiter to split off; :data:`None` value splits off on any whitespace 

523 character, similar to :meth:`str.rsplit`. 

524 

525 """ 

526 

527 delim = delim or " " 

528 parts = self.prefix.rsplit(delim, maxsplit=1) 

529 if len(parts) > 1: 

530 self.iprefix += parts[0] + delim 

531 self.prefix = parts[1] 

532 

533 def split_off_suffix(self, delim: str | None = None, /): 

534 """ 

535 Move everything past the first occurrence of ``delim`` 

536 from :attr:`~CompletionCollector.suffix` 

537 to :attr:`~CompletionCollector.isuffix`. 

538 

539 :param delim: 

540 delimiter to split off; :data:`None` value splits off on any whitespace 

541 character, similar to :meth:`str.split`. 

542 

543 """ 

544 

545 delim = delim or " " 

546 parts = self.suffix.split(delim, maxsplit=1) 

547 if len(parts) > 1: 

548 self.suffix = parts[0] 

549 self.isuffix = delim + parts[1] + self.isuffix 

550 

551 def finalize(self) -> list[Completion]: 

552 """ 

553 Finish collecting completions and return everything that was collected. 

554 

555 Do not reuse a collector after it was finalized. 

556 

557 :returns: 

558 list of completions, sorted by their groups and preferred ordering 

559 within each group. 

560 

561 If all completions start with a common prefix, a single completion 

562 is returned containing this prefix. 

563 

564 """ 

565 

566 if len(self._completions) > 1: 

567 c0 = self._completions[0] 

568 

569 iprefix = c0.iprefix 

570 isuffix = c0.isuffix 

571 

572 if ( 

573 self.full_prefix.startswith(iprefix) 

574 and self.full_suffix.endswith(isuffix) 

575 and all( 

576 c.iprefix == iprefix and c.isuffix == isuffix 

577 for c in self._completions 

578 ) 

579 ): 

580 # If all completions have the same `iprefix` and `isuffix`... 

581 common_prefix = _commonprefix( 

582 list(c.completion for c in self._completions) 

583 ) 

584 if common_prefix and len(iprefix) + len(common_prefix) > len( 

585 self.iprefix 

586 ) + len(self.prefix): 

587 # ...and they have a common prefix that is longer than what's entered so far, 

588 # then complete this common prefix. 

589 rsuffix = "" 

590 rsymbols = "" 

591 if all( 

592 common_prefix == c.completion and rsuffix == c.rsuffix 

593 for c in self._completions 

594 ): 

595 # If completing common prefix actually fulfills a completion, add `rsuffix` as well. 

596 rsuffix = c0.rsuffix 

597 rsymbols = c0.rsymbols 

598 return [ 

599 Completion( 

600 iprefix=iprefix, 

601 completion=common_prefix, 

602 rsuffix=rsuffix, 

603 rsymbols=rsymbols, 

604 isuffix=isuffix, 

605 comment=None, 

606 dprefix="", 

607 dsuffix="", 

608 group_id=(0, 0), 

609 group_color_tag=None, 

610 ) 

611 ] 

612 

613 self._completions.sort() 

614 return self._completions 

615 

616 

617_MAX_COMPLETION_CORRECTIONS: int = 1 

618_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3 

619 

620 

621@_t.final 

622class _CorrectingCollector(CompletionCollector): 

623 def __init__(self, text: str, pos: int): 

624 super().__init__(text, pos) 

625 

626 self._has_corrections = False 

627 

628 def add( 

629 self, 

630 completion: str, 

631 /, 

632 *, 

633 comment: str | None = None, 

634 dprefix: str = "", 

635 dsuffix: str = "", 

636 color_tag: str | None = None, 

637 ): 

638 if not completion or completion in self.dedup_words: 

639 return 

640 

641 a = self.prefix + self.suffix 

642 b = completion 

643 corrections = _corrections(a, b) 

644 threshold = ( 

645 _MAX_COMPLETION_CORRECTIONS 

646 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2 

647 ) 

648 

649 if corrections <= 1: 

650 # this is a simple mistype, add it as usual 

651 self._add( 

652 completion, 

653 comment=comment, 

654 dprefix=dprefix, 

655 dsuffix=dsuffix, 

656 color_tag=color_tag, 

657 ) 

658 elif corrections <= threshold: 

659 # this is a correction, add it into corrections group 

660 if comment: 

661 comment = "corrected: " + comment 

662 else: 

663 comment = "corrected" 

664 with self.save_state(): 

665 self._group_id = 0xFFFFFFFE # (big enough) - 1 

666 self._group_color_tag = "corrected" 

667 self._add( 

668 completion, 

669 comment=comment, 

670 dprefix=dprefix, 

671 dsuffix=dsuffix, 

672 color_tag=color_tag, 

673 ) 

674 self._has_corrections = True 

675 

676 def finalize(self) -> list[Completion]: 

677 if self._has_corrections: 

678 c0 = self._completions[0] 

679 

680 iprefix = "" 

681 prefix = self.full_prefix 

682 suffix = self.full_suffix 

683 isuffix = "" 

684 

685 if prefix.startswith(c0.iprefix): 

686 l = len(c0.iprefix) 

687 iprefix = prefix[:l] 

688 prefix = prefix[l:] 

689 

690 if suffix.endswith(c0.isuffix): 

691 l = len(c0.isuffix) 

692 isuffix = suffix[-l:] 

693 suffix = suffix[:-l] 

694 

695 # If we have corrections, add original value to the end. 

696 with self.save_state(): 

697 self._group_id = 0xFFFFFFFF # (big enough) 

698 self._group_color_tag = "original" 

699 self.iprefix = iprefix 

700 self.isuffix = isuffix 

701 self._add(prefix + suffix, comment="original") 

702 

703 self._completions.sort() 

704 return self._completions 

705 

706 

707def _corrections(a: str, b: str) -> float: 

708 # Damerau–Levenshtein distance (Optimal String Alignment distance) 

709 

710 a = a.casefold() 

711 b = b.casefold() 

712 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)] 

713 for i in range(len(a) + 1): 

714 d[i][0] = i 

715 for j in range(len(b) + 1): 

716 d[0][j] = j 

717 for i in range(1, len(a) + 1): 

718 for j in range(1, len(b) + 1): 

719 d[i][j] = min( 

720 # Add to `a`: 

721 d[i - 1][j] + 1, 

722 # Add to `b`: 

723 d[i][j - 1] + 1, 

724 # Replace: 

725 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]), 

726 # Transpose: 

727 ( 

728 d[i - 2][j - 2] + (a[i - 1] != b[j - 1]) 

729 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1] 

730 else math.inf 

731 ), 

732 ) 

733 

734 return d[-1][-1] 

735 

736 

737def _commonprefix(m: list[str]) -> str: 

738 if not m: 

739 return "" 

740 s1 = min(m) 

741 s2 = max(m) 

742 for i, c in enumerate(s1): 

743 if c != s2[i]: 

744 return s1[:i] 

745 return s1 

746 

747 

748class Completer(abc.ABC): 

749 """ 

750 An interface for text completion providers. 

751 

752 """ 

753 

754 def complete( 

755 self, text: str, pos: int, /, *, do_corrections: bool = True 

756 ) -> list[Completion]: 

757 """ 

758 Complete the given text at the given cursor position. 

759 

760 :param text: 

761 text that is being completed. 

762 :param pos: 

763 position of the cursor in the text. ``0`` means the cursor 

764 is before the first character, ``len(text)`` means the cursor 

765 is after the last character. 

766 :param do_corrections: 

767 if :data:`True` (default), completion system will try to guess 

768 if there are any misspells in the ``text``, and offer to correct them. 

769 :returns: 

770 a sorted list of completions. 

771 

772 If all completions start with a common prefix, a single completion 

773 is returned containing this prefix. 

774 

775 """ 

776 

777 collector = CompletionCollector(text, pos) 

778 with collector.save_state(): 

779 self._process(collector) 

780 completions = collector.finalize() 

781 if completions or not do_corrections: 

782 return completions 

783 

784 collector = _CorrectingCollector(text, pos) 

785 with collector.save_state(): 

786 self._process(collector) 

787 return collector.finalize() 

788 

789 @abc.abstractmethod 

790 def _process(self, collector: CompletionCollector, /): 

791 """ 

792 Generate completions and add them to the given collector. 

793 

794 Implementing this class is straight forward, just feed all possible 

795 completions to the collector. For example, let's implement a completer 

796 for environment variables: 

797 

798 .. code-block:: python 

799 

800 class EnvVarCompleter(Completer): 

801 def _process(self, collector: CompletionCollector): 

802 for var in os.environ.keys(): 

803 collector.add(var) 

804 

805 """ 

806 

807 raise NotImplementedError() 

808 

809 def _get_completion_model( 

810 self, *, is_many: bool = False 

811 ) -> _CompleterSerializer.Model: 

812 """ 

813 Internal, do not use. 

814 

815 """ 

816 

817 return _CompleterSerializer.CustomCompleter(self) 

818 

819 

820class Empty(Completer): 

821 """ 

822 An empty completer that returns no values. 

823 

824 """ 

825 

826 def _process(self, collector: CompletionCollector): 

827 pass # nothing to do 

828 

829 def _get_completion_model( 

830 self, *, is_many: bool = False 

831 ) -> _CompleterSerializer.Model: 

832 return _CompleterSerializer.Model() 

833 

834 

835@dataclass(frozen=True, slots=True) 

836class Option: 

837 """ 

838 A single completion option for the :class:`Choice` completer. 

839 

840 """ 

841 

842 completion: str 

843 """ 

844 This string will replace an element that is being completed. 

845 

846 """ 

847 

848 comment: str | None = None 

849 """ 

850 Short comment displayed alongside the completion. 

851 

852 """ 

853 

854 

855class Choice(Completer): 

856 """ 

857 Completes input from a predefined list of completions. 

858 

859 :param choices: 

860 options to choose completion from. 

861 

862 """ 

863 

864 def __init__(self, choices: _t.Collection[Option], /): 

865 self._choices: _t.Collection[Option] = choices 

866 

867 def _process(self, collector: CompletionCollector, /): 

868 for choice in self._choices: 

869 collector.add(choice.completion, comment=choice.comment) 

870 

871 def _get_completion_model( 

872 self, *, is_many: bool = False 

873 ) -> _CompleterSerializer.Model: 

874 if any(option.comment for option in self._choices): 

875 return _CompleterSerializer.ChoiceWithDesc( 

876 [(option.completion, option.comment or "") for option in self._choices] 

877 ) 

878 else: 

879 return _CompleterSerializer.Choice( 

880 [option.completion for option in self._choices] 

881 ) 

882 

883 

884class Alternative(Completer): 

885 """ 

886 Joins outputs from multiple completers. 

887 

888 :param completers: 

889 list of inner completers. 

890 

891 This is a list of tuples. First tuple element is a description of a completion 

892 group. It will be displayed when this completer is used in shells 

893 that support it (namely, ZSH). Second tuple element is the inner completer 

894 itself. 

895 

896 """ 

897 

898 def __init__(self, completers: list[tuple[str, Completer]], /): 

899 self._completers = completers 

900 

901 def _process(self, collector: CompletionCollector, /): 

902 for _, completer in self._completers: 

903 with collector.save_state(): 

904 collector.add_group() 

905 completer._process(collector) 

906 

907 def _get_completion_model( 

908 self, *, is_many: bool = False 

909 ) -> _CompleterSerializer.Model: 

910 return _CompleterSerializer.Alternative( 

911 [ 

912 (name, completer._get_completion_model(is_many=is_many)) 

913 for name, completer in self._completers 

914 ] 

915 ) 

916 

917 

918class List(Completer): 

919 """ 

920 Completes a value-separated list of elements. 

921 

922 :param inner: 

923 completer for list items. 

924 :param delimiter: 

925 a character that separates list items. :data:`None` separates by any whitespace 

926 character, similar to :meth:`str.split`. 

927 :param allow_duplicates: 

928 whether to show completions that already appear in the list. 

929 

930 """ 

931 

932 def __init__( 

933 self, 

934 inner: Completer, 

935 /, 

936 *, 

937 delimiter: str | None = None, 

938 allow_duplicates: bool = False, 

939 ): 

940 self._inner = inner 

941 if delimiter == "": 

942 raise ValueError("empty delimiter") 

943 self._delimiter = delimiter 

944 self._allow_duplicates = allow_duplicates 

945 

946 def _process(self, collector: CompletionCollector, /): 

947 collector.split_off_prefix(self._delimiter) 

948 collector.split_off_suffix(self._delimiter) 

949 collector.rsuffix = self._delimiter or " " 

950 collector.rsymbols += self._delimiter or string.whitespace 

951 

952 if not self._allow_duplicates: 

953 dedup_words = set( 

954 collector.iprefix.split(self._delimiter) 

955 + collector.isuffix.split(self._delimiter) 

956 ) 

957 if collector.text in dedup_words: 

958 dedup_words.remove(collector.text) 

959 collector.dedup_words = frozenset(dedup_words) 

960 else: 

961 collector.dedup_words = frozenset() 

962 

963 self._inner._process(collector) 

964 

965 def _get_completion_model( 

966 self, *, is_many: bool = False 

967 ) -> _CompleterSerializer.Model: 

968 if is_many: 

969 return _CompleterSerializer.ListMany( 

970 self._delimiter or " ", self._inner._get_completion_model() 

971 ) 

972 else: 

973 return _CompleterSerializer.List( 

974 self._delimiter or " ", self._inner._get_completion_model() 

975 ) 

976 

977 

978class Tuple(Completer): 

979 """ 

980 Completes a value-separated tuple of elements. 

981 

982 :param inner: 

983 completers for each tuple element. 

984 :param delimiter: 

985 a character that separates list items. :data:`None` separates by any whitespace 

986 character, similar to :meth:`str.split`. 

987 

988 """ 

989 

990 def __init__(self, *inner: Completer, delimiter: str | None = None): 

991 self._inner = inner 

992 if delimiter == "": 

993 raise ValueError("empty delimiter") 

994 self._delimiter = delimiter 

995 

996 def _process(self, collector: CompletionCollector, /): 

997 pos = len(collector.prefix.split(self._delimiter)) 

998 if ( 

999 pos 

1000 and self._delimiter is None 

1001 and collector.prefix 

1002 and collector.prefix[-1] in string.whitespace 

1003 ): 

1004 # `.split(None)` will trim whitespaces at the end. 

1005 # Make sure we count those towards the current position in the tuple. 

1006 pos += 1 

1007 if pos > len(self._inner): 

1008 return 

1009 if pos > 0: 

1010 pos -= 1 

1011 

1012 collector.split_off_prefix(self._delimiter) 

1013 collector.split_off_suffix(self._delimiter) 

1014 collector.rsuffix = self._delimiter or " " 

1015 collector.rsymbols += self._delimiter or string.whitespace 

1016 

1017 self._inner[pos]._process(collector) 

1018 

1019 def _get_completion_model( 

1020 self, *, is_many: bool = False 

1021 ) -> _CompleterSerializer.Model: 

1022 if is_many: 

1023 return _CompleterSerializer.TupleMany( 

1024 self._delimiter or " ", 

1025 [inner._get_completion_model() for inner in self._inner], 

1026 ) 

1027 else: 

1028 return _CompleterSerializer.Tuple( 

1029 self._delimiter or " ", 

1030 [inner._get_completion_model() for inner in self._inner], 

1031 ) 

1032 

1033 

1034class File(Completer): 

1035 """ 

1036 Completes file paths. 

1037 

1038 :param extensions: 

1039 allowed file extensions, should include the leading dot. 

1040 

1041 """ 

1042 

1043 def __init__(self, extensions: str | _t.Collection[str] | None = None): 

1044 if isinstance(extensions, str): 

1045 self._extensions = [extensions] 

1046 elif extensions is not None: 

1047 self._extensions = list(extensions) 

1048 else: 

1049 self._extensions = None 

1050 

1051 def _process(self, collector: CompletionCollector, /): 

1052 base, name = os.path.split(collector.prefix) 

1053 if base and not base.endswith(os.path.sep): 

1054 base += os.path.sep 

1055 collector.iprefix += base 

1056 collector.prefix = name 

1057 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0] 

1058 resolved = pathlib.Path(base).expanduser().resolve() 

1059 rsuffix = collector.rsuffix 

1060 if resolved.is_dir(): 

1061 if name.startswith("."): 

1062 collector.rsuffix = "" 

1063 collector.add(os.path.curdir + os.path.sep, color_tag="dir") 

1064 collector.add(os.path.pardir + os.path.sep, color_tag="dir") 

1065 if name.startswith("~"): 

1066 collector.rsuffix = "" 

1067 collector.add("~" + os.path.sep, color_tag="dir") 

1068 try: 

1069 for path in resolved.iterdir(): 

1070 if path.is_dir(): 

1071 if path.is_symlink(): 

1072 color_tag = "symlink" 

1073 dsuffix = "@" 

1074 else: 

1075 color_tag = "dir" 

1076 dsuffix = "" 

1077 collector.rsuffix = "" 

1078 collector.add( 

1079 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix 

1080 ) 

1081 elif self._extensions is None or any( 

1082 path.name.endswith(ext) for ext in self._extensions 

1083 ): 

1084 collector.rsuffix = rsuffix 

1085 color_tag = None 

1086 dsuffix = "" 

1087 if path.is_symlink(): 

1088 color_tag = "symlink" 

1089 dsuffix = "@" 

1090 elif path.is_file(): 

1091 if (os.name != "nt" and os.access(path, os.X_OK)) or ( 

1092 os.name == "nt" and path.suffix == ".exe" 

1093 ): 

1094 color_tag = "exec" 

1095 dsuffix = "*" 

1096 else: 

1097 color_tag = "file" 

1098 elif path.is_socket(): 

1099 color_tag = "socket" 

1100 dsuffix = "=" 

1101 elif path.is_fifo(): 

1102 color_tag = "pipe" 

1103 dsuffix = "|" 

1104 elif path.is_block_device(): 

1105 color_tag = "block_device" 

1106 dsuffix = "#" 

1107 elif path.is_char_device(): 

1108 color_tag = "char_device" 

1109 dsuffix = "%" 

1110 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix) 

1111 except PermissionError: 

1112 return 

1113 

1114 def _get_completion_model( 

1115 self, *, is_many: bool = False 

1116 ) -> _CompleterSerializer.Model: 

1117 return _CompleterSerializer.File( 

1118 "|".join(extension.lstrip(".") for extension in self._extensions or []) 

1119 ) 

1120 

1121 

1122class Dir(File): 

1123 """ 

1124 Completes directories. 

1125 

1126 """ 

1127 

1128 def __init__(self): 

1129 super().__init__([]) 

1130 

1131 def _get_completion_model( 

1132 self, *, is_many: bool = False 

1133 ) -> _CompleterSerializer.Model: 

1134 return _CompleterSerializer.Dir() 

1135 

1136 

1137class _CompleterSerializer: 

1138 def __init__( 

1139 self, 

1140 add_help: bool, 

1141 add_version: bool, 

1142 add_bug_report: bool, 

1143 path: str = "", 

1144 custom_completers: dict[tuple[str, str], list[Completer]] = {}, 

1145 ): 

1146 self._path = path 

1147 self._custom_completers = custom_completers 

1148 self._subcommands: dict[str, tuple[_CompleterSerializer, bool, str]] = {} 

1149 self._positional = 0 

1150 self._flags: list[ 

1151 tuple[ 

1152 list[str], 

1153 str | None, 

1154 str | tuple[str, ...] | None, 

1155 int | _t.Literal["-", "+", "*", "?"], 

1156 _CompleterSerializer.Model, 

1157 ] 

1158 ] = [] 

1159 self._add_help = add_help 

1160 if add_help: 

1161 self._flags.append( 

1162 ( 

1163 ["-h", "--help"], 

1164 "show help message and exit", 

1165 None, 

1166 "-", 

1167 _CompleterSerializer.Model(), 

1168 ) 

1169 ) 

1170 self._add_version = add_version 

1171 if add_version: 

1172 self._flags.append( 

1173 ( 

1174 ["-V", "--version"], 

1175 "show program version and exit", 

1176 None, 

1177 "-", 

1178 _CompleterSerializer.Model(), 

1179 ) 

1180 ) 

1181 self._add_bug_report = add_bug_report 

1182 if add_bug_report: 

1183 self._flags.append( 

1184 ( 

1185 ["--bug-report"], 

1186 "show environment data for bug report and exit", 

1187 None, 

1188 "-", 

1189 _CompleterSerializer.Model(), 

1190 ) 

1191 ) 

1192 

1193 def add_argument(self, *args: str, **kwargs): 

1194 if self._add_help and "--help" in args: 

1195 return 

1196 if self._add_version and "--version" in args: 

1197 return 

1198 if self._add_bug_report and "--bug-report" in args: 

1199 return 

1200 

1201 help = kwargs.get("help") or "" 

1202 

1203 if help == argparse.SUPPRESS: 

1204 return 

1205 

1206 help = yuio.string.strip_color_tags(help) 

1207 

1208 if all(not arg.startswith("-") for arg in args): 

1209 args = (str(self._positional),) 

1210 self._positional += 1 

1211 

1212 action = kwargs.get("action") 

1213 metavar = kwargs.get("metavar") or "" 

1214 nargs = kwargs.get( 

1215 "nargs", 

1216 ( 

1217 0 

1218 if action 

1219 in [ 

1220 "store_const", 

1221 "store_true", 

1222 "store_false", 

1223 "append_const", 

1224 "count", 

1225 "help", 

1226 "version", 

1227 ] 

1228 else 1 

1229 ), 

1230 ) 

1231 if get_parser := getattr(action, "get_parser", None): 

1232 parser = get_parser() 

1233 completer = parser.completer() 

1234 if completer is None: 

1235 completer = Empty() 

1236 completion_model = completer._get_completion_model( 

1237 is_many=parser.supports_parse_many() 

1238 ) 

1239 else: 

1240 completion_model = self.Model() 

1241 

1242 self._args = ";".join(args) 

1243 completion_model.collect(self) 

1244 

1245 self._flags.append((list(args), help, metavar, nargs, completion_model)) 

1246 

1247 def add_mutually_exclusive_group(self, *args, **kwargs): 

1248 return self 

1249 

1250 def add_argument_group(self, *args, **kwargs): 

1251 return self 

1252 

1253 def add_subparsers(self, *args, **kwargs): 

1254 return self 

1255 

1256 def add_parser( 

1257 self, 

1258 name: str, 

1259 *, 

1260 aliases: _t.Sequence[str] = (), 

1261 help: str, 

1262 **kwargs, 

1263 ): 

1264 if help != argparse.SUPPRESS: 

1265 help = yuio.string.strip_color_tags(str(help or "")) 

1266 serializer = _CompleterSerializer( 

1267 self._add_help, 

1268 self._add_version, 

1269 self._add_bug_report, 

1270 f"{self._path}/{name}", 

1271 self._custom_completers, 

1272 ) 

1273 self._subcommands[name] = (serializer, False, help) 

1274 for alias in aliases: 

1275 self._subcommands[alias] = (serializer, True, help) 

1276 return serializer 

1277 

1278 def register_custom_completer(self, completer: Completer) -> str: 

1279 completers = self._custom_completers.setdefault((self._path, self._args), []) 

1280 data = json.dumps([self._path, self._args, len(completers)]) 

1281 completers.append(completer) 

1282 return data 

1283 

1284 def get_custom_completer(self, data: str) -> Completer | None: 

1285 try: 

1286 path, args, index = json.loads(data) 

1287 return self._custom_completers[(path, args)][index] 

1288 except (json.JSONDecodeError, IndexError, TypeError, ValueError): 

1289 pass 

1290 return None 

1291 

1292 def as_parser(self) -> argparse.ArgumentParser: 

1293 # We've implemented all methods that `Config._setup_arg_parser` could call. 

1294 return _t.cast(argparse.ArgumentParser, self) 

1295 

1296 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ") 

1297 

1298 def _dump(self, path: str, result: list[str]): 

1299 if self._subcommands: 

1300 self._flags.append( 

1301 ( 

1302 ["c"], 

1303 "subcommand", 

1304 "<cmd>", 

1305 1, 

1306 _CompleterSerializer.ChoiceWithDesc( 

1307 [ 

1308 (name, help) 

1309 for name, (_, is_alias, help) in self._subcommands.items() 

1310 if not is_alias and help != argparse.SUPPRESS 

1311 ] 

1312 ), 

1313 ) 

1314 ) 

1315 

1316 for opts, desc, meta, nargs, completer in self._flags: 

1317 if not isinstance(meta, tuple): 

1318 meta = (meta,) 

1319 compspec: list[str] = [ 

1320 path, 

1321 " ".join(opts), 

1322 desc or "", 

1323 " ".join( 

1324 re.sub( 

1325 r"[\\ ]", 

1326 lambda s: "\\S" if s.group() == " " else "\\L", 

1327 str(m), 

1328 ) 

1329 or "" 

1330 for m in meta 

1331 ), 

1332 str(nargs), 

1333 *completer.dump(), 

1334 ] 

1335 

1336 result.append( 

1337 "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec) 

1338 ) 

1339 

1340 for subcommand, (serializer, *_) in self._subcommands.items(): 

1341 serializer._dump(f"{path}/{subcommand}", result) 

1342 

1343 def _collect_nested(self, compspec: list[object]): 

1344 for item in compspec: 

1345 self._collect_nested_item(item) 

1346 

1347 def _collect_nested_item(self, item: object): 

1348 if isinstance(item, _CompleterSerializer.Model): 

1349 item.collect(self) 

1350 elif isinstance(item, (list, tuple)): 

1351 for sub_item in item: 

1352 self._collect_nested_item(sub_item) 

1353 

1354 @staticmethod 

1355 def _dump_nested(compspec: list[object]) -> list[str]: 

1356 contents = [] 

1357 

1358 for item in compspec: 

1359 contents.extend(_CompleterSerializer._dump_nested_item(item)) 

1360 

1361 return contents 

1362 

1363 @staticmethod 

1364 def _dump_nested_item(item: object) -> list[str]: 

1365 contents = [] 

1366 

1367 if isinstance(item, _CompleterSerializer.Model): 

1368 contents.extend(item.dump()) 

1369 elif isinstance(item, list): 

1370 contents.append(str(len(item))) 

1371 for sub_item in item: 

1372 contents.extend(_CompleterSerializer._dump_nested_item(sub_item)) 

1373 elif isinstance(item, tuple): 

1374 for sub_item in item: 

1375 contents.extend(_CompleterSerializer._dump_nested_item(sub_item)) 

1376 else: 

1377 contents.append(str(item)) 

1378 

1379 return contents 

1380 

1381 @dataclass 

1382 class ModelBase: 

1383 tag: typing.ClassVar[str] = "-" 

1384 

1385 def __init_subclass__(cls, tag: str = "-", **kwargs): 

1386 super().__init_subclass__(**kwargs) 

1387 cls.tag = tag 

1388 

1389 @dataclass 

1390 class Model(ModelBase): 

1391 def collect(self, s: _CompleterSerializer): 

1392 compspec = [getattr(self, field.name) for field in dataclasses.fields(self)] 

1393 s._collect_nested(compspec) 

1394 

1395 def dump(self) -> list[str]: 

1396 compspec = [getattr(self, field.name) for field in dataclasses.fields(self)] 

1397 contents = _CompleterSerializer._dump_nested(compspec) 

1398 return [self.tag, str(len(contents)), *contents] 

1399 

1400 @dataclass 

1401 class File(Model, tag="f"): 

1402 ext: str 

1403 

1404 @dataclass 

1405 class Dir(Model, tag="d"): 

1406 pass 

1407 

1408 @dataclass 

1409 class Choice(Model, tag="c"): 

1410 choices: list[str] 

1411 

1412 def dump(self) -> list[str]: 

1413 return [self.tag, str(len(self.choices)), *self.choices] 

1414 

1415 @dataclass 

1416 class ChoiceWithDesc(Model, tag="cd"): 

1417 choices: list[tuple[str, str]] 

1418 

1419 def dump(self) -> list[str]: 

1420 return [ 

1421 self.tag, 

1422 str(len(self.choices) * 2), 

1423 *[c[0] for c in self.choices], 

1424 *[yuio.string.strip_color_tags(c[1]) for c in self.choices], 

1425 ] 

1426 

1427 @dataclass 

1428 class Git(Model, tag="g"): 

1429 class Mode(enum.Enum): 

1430 Branch = "b" 

1431 Remote = "r" 

1432 Tag = "t" 

1433 Head = "h" 

1434 

1435 modes: set[Mode] = dataclasses.field( 

1436 default_factory=lambda: { 

1437 _CompleterSerializer.Git.Mode.Branch, 

1438 _CompleterSerializer.Git.Mode.Tag, 

1439 _CompleterSerializer.Git.Mode.Head, 

1440 } 

1441 ) 

1442 

1443 def dump(self) -> list[str]: 

1444 return [self.tag, "1", "".join(mode.value for mode in self.modes)] 

1445 

1446 @dataclass 

1447 class List(Model, tag="l"): 

1448 delim: str 

1449 inner: _CompleterSerializer.Model 

1450 

1451 @dataclass 

1452 class ListMany(List, tag="lm"): 

1453 pass 

1454 

1455 @dataclass 

1456 class Tuple(Model, tag="t"): 

1457 delim: str 

1458 inner: list[_CompleterSerializer.Model] 

1459 

1460 @dataclass 

1461 class TupleMany(Tuple, tag="tm"): 

1462 pass 

1463 

1464 @dataclass 

1465 class Alternative(Model, tag="a"): 

1466 alternatives: list[tuple[str, _CompleterSerializer.Model]] 

1467 

1468 @dataclass 

1469 class CustomCompleter(Model, tag="cc"): 

1470 completer: Completer 

1471 _data: str | None = None 

1472 

1473 def collect(self, s: _CompleterSerializer): 

1474 self._data = s.register_custom_completer(self.completer) 

1475 

1476 def dump(self) -> list[str]: 

1477 assert self._data is not None 

1478 return [ 

1479 self.tag, 

1480 "1", 

1481 self._data, 

1482 ] 

1483 

1484 

1485def _run_custom_completer(s: _CompleterSerializer, data: str, word: str): 

1486 completer = s.get_custom_completer(data) 

1487 if completer is None: 

1488 return 

1489 completions = completer.complete(word, len(word), do_corrections=False) 

1490 for completion in completions: 

1491 print( 

1492 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}", 

1493 file=sys.__stdout__, 

1494 ) 

1495 

1496 

1497_PROG_ESCAPE = str.maketrans( 

1498 string.punctuation + string.whitespace, 

1499 "_" * (len(string.punctuation) + len(string.whitespace)), 

1500) 

1501 

1502 

1503def _write_completions( 

1504 s: _CompleterSerializer, prog: str | None = None, shell: str = "all" 

1505): 

1506 import yuio.io 

1507 

1508 true_prog = prog or pathlib.Path(sys.argv[0]).stem 

1509 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE) 

1510 

1511 if pathlib.Path(sys.argv[0]).stem == "__main__": 

1512 yuio.io.failure( 

1513 "You've invoked this program as a python module, most likely with " 

1514 "`python -m <module>`. For completions to work, the program " 

1515 "must be invoked as a command in your `$PATH`" 

1516 ) 

1517 sys.exit(1) 

1518 if not prog: 

1519 yuio.io.failure("Failed to generate completion because program name is empty") 

1520 sys.exit(1) 

1521 if not re.match(r"^[a-zA-Z0-9_-]+$", prog): 

1522 yuio.io.failure( 

1523 "Failed to generate completion due to " 

1524 "forbidden characters in program name: `%r`", 

1525 prog, 

1526 ) 

1527 sys.exit(1) 

1528 

1529 if shell == "uninstall": 

1530 shell = "all" 

1531 yuio.io.heading("Uninstalling completions for `%s`", true_prog) 

1532 install = False 

1533 else: 

1534 yuio.io.heading("Generating completions for `%s`", true_prog) 

1535 install = True 

1536 

1537 if not shutil.which(true_prog): 

1538 yuio.io.warning( 

1539 "Program `%s` is not in your `$PATH`. Completions might not be able " 

1540 "to initialize", 

1541 true_prog, 

1542 ) 

1543 

1544 if os.name == "nt": 

1545 data_home = cache_home = config_home = pathlib.Path( 

1546 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local") 

1547 ) 

1548 else: 

1549 data_home = pathlib.Path( 

1550 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share") 

1551 ) 

1552 cache_home = pathlib.Path( 

1553 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache") 

1554 ) 

1555 config_home = pathlib.Path( 

1556 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config") 

1557 ) 

1558 

1559 compdata_path = data_home / f"yuio/{prog}.compdata.tsv" 

1560 

1561 if install: 

1562 task_heading = "Installing completions" 

1563 else: 

1564 task_heading = "Uninstalling completions" 

1565 

1566 with yuio.io.Task(task_heading) as t: 

1567 if install: 

1568 result = [] 

1569 s._dump("", result) 

1570 compdata = "\n".join(result) 

1571 

1572 os.makedirs(data_home / "yuio", exist_ok=True) 

1573 compdata_path.write_text(compdata) 

1574 yuio.io.info( 

1575 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path 

1576 ) 

1577 elif compdata_path.exists(): 

1578 os.remove(compdata_path) 

1579 yuio.io.info("Removed <c path>%s</c>", compdata_path) 

1580 

1581 if shell in ["all", "bash"]: 

1582 t.comment("Bash") 

1583 _write_bash_script( 

1584 prog, 

1585 true_prog, 

1586 install, 

1587 compdata_path, 

1588 data_home, 

1589 cache_home, 

1590 config_home, 

1591 ) 

1592 if shell in ["all", "zsh"]: 

1593 t.comment("Zsh") 

1594 _write_zsh_script( 

1595 prog, 

1596 true_prog, 

1597 install, 

1598 compdata_path, 

1599 data_home, 

1600 cache_home, 

1601 config_home, 

1602 ) 

1603 if shell in ["all", "fish"]: 

1604 t.comment("Fish") 

1605 _write_fish_script( 

1606 prog, 

1607 true_prog, 

1608 install, 

1609 compdata_path, 

1610 data_home, 

1611 cache_home, 

1612 config_home, 

1613 ) 

1614 if shell in ["all", "pwsh"]: 

1615 t.comment("PowerShell") 

1616 _write_pwsh_script( 

1617 prog, 

1618 true_prog, 

1619 install, 

1620 compdata_path, 

1621 data_home, 

1622 cache_home, 

1623 config_home, 

1624 ) 

1625 

1626 yuio.io.success("All done! Please restart your shell for changes to take effect.") 

1627 if install: 

1628 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog) 

1629 

1630 

1631def _write_bash_script( 

1632 prog: str, 

1633 true_prog: str, 

1634 install: bool, 

1635 compdata_path: pathlib.Path, 

1636 data_home: pathlib.Path, 

1637 cache_home: pathlib.Path, 

1638 config_home: pathlib.Path, 

1639): 

1640 import yuio.exec 

1641 import yuio.io 

1642 

1643 if os.name == "nt": 

1644 yuio.io.warning( 

1645 "Skipped <c note>Bash</c>: completion script doesn't support windows" 

1646 ) 

1647 return 

1648 

1649 try: 

1650 bash_completions_home = yuio.exec.exec( 

1651 "bash", 

1652 "-lc", 

1653 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"', 

1654 ).splitlines()[-1] 

1655 except (subprocess.CalledProcessError, IndexError): 

1656 bash_completions_home = data_home / "bash-completion/completions/" 

1657 bash_completions_home = pathlib.Path(bash_completions_home) 

1658 script_dest = bash_completions_home / true_prog 

1659 

1660 if install: 

1661 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path) 

1662 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest) 

1663 elif script_dest.exists(): 

1664 os.remove(script_dest) 

1665 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1666 

1667 

1668def _write_zsh_script( 

1669 prog: str, 

1670 true_prog: str, 

1671 install: bool, 

1672 compdata_path: pathlib.Path, 

1673 data_home: pathlib.Path, 

1674 cache_home: pathlib.Path, 

1675 config_home: pathlib.Path, 

1676): 

1677 import yuio.exec 

1678 import yuio.io 

1679 

1680 if os.name == "nt": 

1681 yuio.io.warning( 

1682 "Skipped <c note>Zsh</c>: completion script doesn't support windows" 

1683 ) 

1684 return 

1685 

1686 needs_cache_cleanup = False 

1687 

1688 zsh_completions_home = data_home / "zsh/completions" 

1689 script_dest = zsh_completions_home / ("_" + true_prog) 

1690 

1691 if install: 

1692 needs_cache_cleanup = True 

1693 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path) 

1694 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest) 

1695 elif script_dest.exists(): 

1696 needs_cache_cleanup = True 

1697 

1698 os.remove(script_dest) 

1699 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1700 

1701 try: 

1702 fpath = ( 

1703 yuio.exec.exec( 

1704 "zsh", 

1705 "-lc", 

1706 "echo -n $FPATH", 

1707 ) 

1708 .splitlines()[-1] 

1709 .split(":") 

1710 ) 

1711 except (subprocess.CalledProcessError, IndexError): 

1712 fpath = [] 

1713 

1714 try: 

1715 zhome = yuio.exec.exec( 

1716 "zsh", 

1717 "-lc", 

1718 "echo -n ${ZDOTDIR:-$HOME}", 

1719 ).splitlines()[-1] 

1720 except (subprocess.CalledProcessError, IndexError): 

1721 zhome = pathlib.Path.home() 

1722 

1723 zhome = pathlib.Path(zhome) 

1724 zprofile_path = zhome / ".zprofile" 

1725 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n" 

1726 

1727 if install: 

1728 if str(zsh_completions_home) not in fpath: 

1729 with open(zprofile_path, "a") as f: 

1730 f.write(zprofile_append_text) 

1731 yuio.io.info( 

1732 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`", 

1733 zprofile_path, 

1734 zsh_completions_home, 

1735 ) 

1736 elif zprofile_path.exists(): 

1737 zprofile_text = zprofile_path.read_text() 

1738 if zprofile_append_text in zprofile_text: 

1739 yuio.io.info( 

1740 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1741 " because other completions might rely on them", 

1742 zprofile_path, 

1743 ) 

1744 

1745 if not needs_cache_cleanup: 

1746 return 

1747 

1748 # Try to remove completions cache from the most common places. 

1749 for zcomp_basedir in [zhome, cache_home / "prezto"]: 

1750 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir(): 

1751 continue 

1752 for file in zcomp_basedir.iterdir(): 

1753 if file.is_file() and re.match(r"^\.?zcompdump", file.name): 

1754 os.remove(file) 

1755 yuio.io.info( 

1756 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>", 

1757 file, 

1758 ) 

1759 

1760 try: 

1761 # Run zsh with the right flags in case zshrc runs compinit. 

1762 # If after generating completions user runs `zsh` without the `-l` flag, 

1763 # our changes to fpath will not be visible, and compinit will dump 

1764 # an invalid version of cache. To avoid this, we call zsh ourselves 

1765 # before the user has a chance to do it. Notice, though, that we don't 

1766 # run `compdump`. This is because we can't be sure that the user uses 

1767 # the default cache path (~/.zcompdump). 

1768 yuio.exec.exec("zsh", "-lc", "true") 

1769 except subprocess.CalledProcessError: 

1770 pass 

1771 

1772 

1773def _write_fish_script( 

1774 prog: str, 

1775 true_prog: str, 

1776 install: bool, 

1777 compdata_path: pathlib.Path, 

1778 data_home: pathlib.Path, 

1779 cache_home: pathlib.Path, 

1780 config_home: pathlib.Path, 

1781): 

1782 import yuio.io 

1783 

1784 if os.name == "nt": 

1785 yuio.io.warning( 

1786 "Skipped <c note>Fish</c>: completion script doesn't support windows" 

1787 ) 

1788 return 

1789 

1790 fish_completions_home = data_home / "fish/vendor_completions.d" 

1791 script_dest = fish_completions_home / (true_prog + ".fish") 

1792 

1793 if install: 

1794 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path) 

1795 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest) 

1796 elif script_dest.exists(): 

1797 os.remove(script_dest) 

1798 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1799 

1800 

1801def _write_pwsh_script( 

1802 prog: str, 

1803 true_prog: str, 

1804 install: bool, 

1805 compdata_path: pathlib.Path, 

1806 data_home: pathlib.Path, 

1807 cache_home: pathlib.Path, 

1808 config_home: pathlib.Path, 

1809): 

1810 import yuio.exec 

1811 import yuio.io 

1812 

1813 if shutil.which("pwsh"): 

1814 command = "pwsh" 

1815 elif shutil.which("powershell"): 

1816 command = "powershell" 

1817 else: 

1818 yuio.io.warning( 

1819 "Skipped <c note>PowerShell</c>: `pwsh` command is not available" 

1820 ) 

1821 return 

1822 

1823 try: 

1824 pwsh_data = ( 

1825 yuio.exec.exec( 

1826 command, 

1827 "-Command", 

1828 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"', 

1829 ) 

1830 .splitlines()[-1] 

1831 .strip() 

1832 ) 

1833 except (subprocess.CalledProcessError, IndexError) as e: 

1834 yuio.io.warning( 

1835 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s", 

1836 e, 

1837 ) 

1838 return 

1839 except FileNotFoundError: 

1840 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found") 

1841 return 

1842 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data): 

1843 version = match.group(1) 

1844 profile_s = match.group(2) 

1845 else: 

1846 yuio.io.warning( 

1847 "Skipped <c note>PowerShell</c>: can't determine powershell version" 

1848 ) 

1849 return 

1850 if not profile_s: 

1851 yuio.io.warning( 

1852 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty" 

1853 ) 

1854 return 

1855 if tuple(int(v) for v in version.split(".")) < (5, 0, 0): 

1856 yuio.io.warning( 

1857 "Skipped <c note>PowerShell</c>: completions script requires " 

1858 "PowerShell 5 or newer, you have %s", 

1859 version, 

1860 ) 

1861 return 

1862 

1863 profile_path = pathlib.Path(profile_s).expanduser().resolve() 

1864 profile_path.parent.mkdir(exist_ok=True, parents=True) 

1865 

1866 data_dir = data_home / "yuio/pwsh" 

1867 loader_path = data_dir / "LoadCompletions.ps1" 

1868 script_dest = data_dir / f"_{true_prog}.ps1" 

1869 if install: 

1870 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path) 

1871 yuio.io.info( 

1872 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest 

1873 ) 

1874 _write_pwsh_loader(loader_path, data_dir) 

1875 elif script_dest.exists(): 

1876 os.remove(script_dest) 

1877 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1878 

1879 try: 

1880 data_dirs = [ 

1881 pathlib.Path(f).expanduser().resolve() 

1882 for f in yuio.exec.exec( 

1883 command, 

1884 "-Command", 

1885 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")', 

1886 ) 

1887 .strip() 

1888 .splitlines() 

1889 ] 

1890 except (subprocess.CalledProcessError, FileNotFoundError): 

1891 return 

1892 

1893 pwsh_profile_append_text = f"\n. {loader_path}\n" 

1894 

1895 if install: 

1896 if data_dir not in data_dirs: 

1897 with open(profile_path, "a") as f: 

1898 f.write(pwsh_profile_append_text) 

1899 yuio.io.info( 

1900 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup", 

1901 profile_path, 

1902 loader_path, 

1903 ) 

1904 elif profile_path.exists(): 

1905 pwsh_profile_text = profile_path.read_text() 

1906 if pwsh_profile_append_text in pwsh_profile_text: 

1907 yuio.io.info( 

1908 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1909 " because other completions might rely on them", 

1910 profile_path, 

1911 ) 

1912 

1913 

1914def _write_script( 

1915 path: pathlib.Path, 

1916 script_name: str, 

1917 prog: str, 

1918 true_prog: str, 

1919 compdata_path: pathlib.Path, 

1920): 

1921 script_template = _read_script(script_name) 

1922 script = ( 

1923 (script_template) 

1924 .replace("@prog@", prog) 

1925 .replace("@true_prog@", true_prog) 

1926 .replace("@data@", str(compdata_path)) 

1927 .replace("@version@", yuio.__version__) 

1928 ) 

1929 

1930 path.parent.mkdir(exist_ok=True, parents=True) 

1931 path.write_text(script) 

1932 

1933 

1934def _read_script(script_name: str): 

1935 import zipfile 

1936 import zipimport 

1937 

1938 if isinstance(__loader__, zipimport.zipimporter): 

1939 # Yuio is imported directly from a wheel. 

1940 with zipfile.ZipFile(__loader__.archive) as archive: 

1941 script_template = archive.read("yuio/_complete/" + script_name) 

1942 return script_template.decode() 

1943 else: 

1944 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name 

1945 return script_template_path.read_text() 

1946 

1947 

1948def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path): 

1949 import yuio.io 

1950 

1951 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1" 

1952 loader_template = loader_template_path.read_text() 

1953 

1954 loader_version = re.search( 

1955 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE 

1956 ) 

1957 assert loader_version 

1958 

1959 if loader_path.exists() and loader_path.is_file(): 

1960 current_loader = loader_path.read_text() 

1961 current_version_s = re.search( 

1962 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE 

1963 ) 

1964 

1965 if current_version_s is None: 

1966 yuio.io.warning( 

1967 "<c note>Note:</c> can't determine version of <c path>%s</c>, " 

1968 "file will be overridden", 

1969 loader_path, 

1970 ) 

1971 elif int(loader_version.group(1)) <= int(current_version_s.group(1)): 

1972 return 

1973 

1974 loader_template = loader_template.replace("@data@", str(data_dir)) 

1975 loader_path.write_text(loader_template) 

1976 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)