Coverage for yuio / complete.py: 78%

761 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-05 11:41 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9This module provides autocompletion functionality for widgets and CLI. 

10 

11 

12Completer basics 

13---------------- 

14 

15All completers are derived from the :class:`Completer` base class 

16with a simple interface: 

17 

18.. autoclass:: Completer 

19 :members: 

20 

21.. autoclass:: Completion 

22 :members: 

23 

24 

25Completers 

26---------- 

27 

28Yuio provides basic completers that cover most of the cases: 

29 

30.. autoclass:: Empty 

31 

32.. autoclass:: Alternative 

33 

34.. autoclass:: Choice 

35 

36.. autoclass:: Option 

37 :members: 

38 

39.. autoclass:: List 

40 

41.. autoclass:: Tuple 

42 

43.. autoclass:: File 

44 

45.. autoclass:: Dir 

46 

47 

48Implementing your own completer 

49------------------------------- 

50 

51To implement a custom completer, subclass :class:`Completer` and implement 

52its :meth:`~Completer._process` method. 

53 

54.. note:: 

55 

56 When using a custom completer for CLI flags in :mod:`yuio.app`, 

57 completion script will invoke your program with special arguments 

58 to run the completer and get its result. 

59 

60.. class:: Completer 

61 :noindex: 

62 

63 .. automethod:: _process 

64 

65.. autoclass:: CompletionCollector 

66 

67""" 

68 

69from __future__ import annotations 

70 

71import abc 

72import contextlib 

73import dataclasses 

74import functools 

75import json 

76import math 

77import os 

78import pathlib 

79import re 

80import shutil 

81import string 

82import subprocess 

83import sys 

84from dataclasses import dataclass 

85 

86import yuio 

87import yuio.string 

88 

89import typing 

90import yuio._typing_ext as _tx 

91from typing import TYPE_CHECKING 

92 

93if TYPE_CHECKING: 

94 import typing_extensions as _t 

95else: 

96 from yuio import _typing as _t 

97 

98__all__ = [ 

99 "Alternative", 

100 "Choice", 

101 "Completer", 

102 "Completion", 

103 "CompletionCollector", 

104 "Dir", 

105 "Empty", 

106 "File", 

107 "List", 

108 "Option", 

109 "Tuple", 

110] 

111 

112 

113@dataclass(frozen=True, slots=True) 

114@functools.total_ordering 

115class Completion: 

116 """ 

117 A single completion. 

118 

119 """ 

120 

121 iprefix: str 

122 """ 

123 See :class:`CompletionCollector.iprefix` for details. 

124 

125 """ 

126 

127 completion: str 

128 """ 

129 Text of the completion. 

130 

131 """ 

132 

133 rsuffix: str 

134 """ 

135 See :class:`CompletionCollector.rsuffix` for details. 

136 

137 """ 

138 

139 rsymbols: str 

140 """ 

141 See :class:`CompletionCollector.rsymbols` for details. 

142 

143 """ 

144 

145 isuffix: str 

146 """ 

147 See :class:`CompletionCollector.isuffix` for details. 

148 

149 """ 

150 

151 comment: str | None 

152 """ 

153 Short comment displayed alongside the completion. 

154 

155 """ 

156 

157 dprefix: str 

158 """ 

159 Prefix that will be displayed before :attr:`~Completion.completion` 

160 when listing completions, but will not be inserted once completion 

161 is applied. 

162 

163 """ 

164 

165 dsuffix: str 

166 """ 

167 Like :attr:`~Completion.dprefix`, but it's a suffix. 

168 

169 """ 

170 

171 group_id: _tx.SupportsLt[_t.Any] = dataclasses.field(repr=False) 

172 """ 

173 Group id, used to sort completions. 

174 

175 Actual content of this property is an implementation detail. 

176 

177 """ 

178 

179 group_color_tag: str | None 

180 """ 

181 Color tag that's used when displaying this completion. 

182 

183 See :meth:`CompletionCollector.add_group` for details. 

184 

185 """ 

186 

187 def __lt__(self, other: Completion) -> bool: 

188 """ 

189 Completions are ordered by their groups and then alphabetically. 

190 

191 """ 

192 

193 return self.group_id < other.group_id or ( 

194 self.group_id == other.group_id and self.completion < other.completion 

195 ) 

196 

197 

198@dataclass(init=False, eq=False, repr=False, match_args=False) 

199class CompletionCollector: 

200 """ 

201 A class that collects completions as completers are running. 

202 

203 The text that is being completed is split into four parts, similar 

204 to what you might see in ZSH completion widgets. The main two are: 

205 

206 .. autoattribute:: prefix 

207 

208 .. autoattribute:: suffix 

209 

210 When completions are added to the collector, they are checked against 

211 the current prefix to determine if they match the entered text. If they 

212 do, the completion system will replace text from `prefix` and `suffix` 

213 with the new completion string. 

214 

215 The two additional parts are: 

216 

217 .. autoattribute:: iprefix 

218 

219 .. autoattribute:: isuffix 

220 

221 For example, suppose you're completing a second element 

222 of a colon-separated list. The list completer will set up 

223 the collector so that `prefix` and `suffix` contain parts of the 

224 current list element, while `iprefix` and `isuffix` contain 

225 the rest of the elements: 

226 

227 .. code-block:: text 

228 

229 list_element_1:list_el|ement_2:list_element_3 

230 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘ 

231 iprefix prefix │ suffix isuffix 

232 └ cursor 

233 

234 Now, if the completer adds a completion ``"list_elements"``, 

235 this text will replace the `prefix` and `suffix`, but not `iprefix` 

236 and `isuffix`. So, after the completion is applied, the string will 

237 look like so: 

238 

239 .. code-block:: text 

240 

241 list_element_1:list_elements:list_element_3 

242 └┬──────────┘ 

243 this got replaced 

244 

245 Finally, there is `rsuffix`: 

246 

247 .. autoattribute:: rsuffix 

248 

249 .. autoattribute:: rsymbols 

250 

251 So, when completing a colon-separated list, colons will be added and removed 

252 automatically, similar to how ZSH does it. 

253 

254 .. autoattribute:: dedup_words 

255 

256 .. autoattribute:: full_prefix 

257 

258 .. autoattribute:: full_suffix 

259 

260 .. autoattribute:: text 

261 

262 .. autoattribute:: num_completions 

263 

264 .. automethod:: add 

265 

266 .. automethod:: add_group 

267 

268 .. automethod:: save_state 

269 

270 .. automethod:: split_off_prefix 

271 

272 .. automethod:: split_off_suffix 

273 

274 .. automethod:: finalize 

275 

276 """ 

277 

278 iprefix: str 

279 """ 

280 Contains text that goes before the :attr:`~CompletionCollector.prefix`. 

281 

282 This prefix is not considered when checking whether a completion 

283 matches a text, and it is not replaced by the completion. It will also 

284 not be shown in the table of completions. 

285 

286 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix` 

287 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into 

288 list elements. 

289 

290 """ 

291 

292 prefix: str 

293 """ 

294 Portion of the completed text before the cursor. 

295 

296 """ 

297 

298 suffix: str 

299 """ 

300 Portion of the completed text after the cursor. 

301 

302 """ 

303 

304 rsuffix: str 

305 """ 

306 Starts empty, and may be set to hold a list separator. 

307 

308 This suffix will be added after the completion. However, it will be automatically 

309 removed if the user types one of :attr:`CompletionCollector.rsymbols`, 

310 or moves cursor, or alters input in some other way. 

311 

312 This property is mutable and can be changed by completers. 

313 

314 """ 

315 

316 rsymbols: str 

317 """ 

318 If user types one of the symbols from this string, 

319 :attr:`~.CompletionCollector.rsuffix` will be removed. 

320 

321 This property is mutable and can be changed by completers. 

322 

323 """ 

324 

325 isuffix: str 

326 """ 

327 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes. 

328 

329 """ 

330 

331 dedup_words: frozenset[str] 

332 """ 

333 Completions from this set will not be added. This is useful 

334 when completing lists of unique values. 

335 

336 This property is mutable and can be changed by completers. 

337 

338 """ 

339 

340 # Internal fields. 

341 _group_id: int 

342 _group_sorted: bool 

343 _group_color_tag: str | None 

344 

345 def __init__(self, text: str, pos: int, /): 

346 self.iprefix = "" 

347 self.prefix = text[:pos] 

348 self.suffix = text[pos:] 

349 self.rsuffix = "" 

350 self.rsymbols = "" 

351 self.isuffix = "" 

352 self.dedup_words = frozenset() 

353 

354 self._group_id = 0 

355 self._group_sorted = True 

356 self._group_color_tag = None 

357 

358 self._completions: list[Completion] = [] 

359 

360 @property 

361 def full_prefix(self) -> str: 

362 """ 

363 Portion of the final completed text that goes before the cursor. 

364 

365 """ 

366 

367 return self.iprefix + self.prefix 

368 

369 @property 

370 def full_suffix(self) -> str: 

371 """ 

372 Portion of the final completed text that goes after the cursor. 

373 

374 """ 

375 

376 return self.suffix + self.isuffix 

377 

378 @property 

379 def text(self) -> str: 

380 """ 

381 Portion of the text that is being autocompleted. 

382 

383 """ 

384 

385 return self.prefix + self.suffix 

386 

387 @contextlib.contextmanager 

388 def save_state(self): 

389 """ 

390 Save current state of the collector, i.e. prefixes, 

391 suffixes, etc., upon entering this context manager, 

392 then restore state upon exiting. 

393 

394 Use this context manager when you need to call nested 

395 completers more than once to prevent changes made in 

396 one nested completer bleeding out into another 

397 nested completer. 

398 

399 """ 

400 

401 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)} 

402 

403 try: 

404 yield 

405 finally: 

406 for name, value in state.items(): 

407 setattr(self, name, value) 

408 

409 def add( 

410 self, 

411 completion: str, 

412 /, 

413 *, 

414 comment: str | None = None, 

415 dprefix: str = "", 

416 dsuffix: str = "", 

417 color_tag: str | None = None, 

418 ): 

419 """ 

420 Add a new completion. 

421 

422 :param completion: 

423 completed text without :attr:`~CompletionCollector.iprefix` 

424 and :attr:`~CompletionCollector.isuffix`. This text will replace 

425 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`. 

426 :param comment: 

427 additional comment that will be displayed near the completion. 

428 :param color_tag: 

429 allows overriding color tag from the group. 

430 

431 """ 

432 

433 if ( 

434 completion 

435 and completion not in self.dedup_words 

436 and completion.startswith(self.prefix) 

437 ): 

438 self._add( 

439 completion, 

440 comment=comment, 

441 dprefix=dprefix, 

442 dsuffix=dsuffix, 

443 color_tag=color_tag, 

444 ) 

445 

446 def _add( 

447 self, 

448 completion: str, 

449 /, 

450 *, 

451 comment: str | None = None, 

452 dprefix: str = "", 

453 dsuffix: str = "", 

454 color_tag: str | None = None, 

455 ): 

456 if not self.isuffix or self.isuffix[0] in string.whitespace: 

457 # Only add `rsuffix` if we're at the end of an array element. 

458 # Don't add `rsuffix` if we're in the middle of an array, unless the array 

459 # is separated by spaces. 

460 rsuffix = self.rsuffix 

461 rsymbols = self.rsymbols 

462 else: 

463 rsuffix = "" 

464 rsymbols = "" 

465 

466 if self._group_sorted: 

467 group_id = (self._group_id, 0) 

468 else: 

469 group_id = (self._group_id, len(self._completions)) 

470 

471 if color_tag is None: 

472 color_tag = self._group_color_tag 

473 

474 self._completions.append( 

475 Completion( 

476 iprefix=self.iprefix, 

477 completion=completion, 

478 rsuffix=rsuffix, 

479 rsymbols=rsymbols, 

480 isuffix=self.isuffix, 

481 comment=comment, 

482 dprefix=dprefix, 

483 dsuffix=dsuffix, 

484 group_id=group_id, 

485 group_color_tag=color_tag, 

486 ) 

487 ) 

488 

489 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None): 

490 """ 

491 Add a new completions group. 

492 

493 All completions added after call to this method will be placed to the new group. 

494 They will be grouped together, and colored according to the group's color tag. 

495 

496 :param sorted: 

497 controls whether completions in the new group 

498 should be sorted lexicographically. 

499 :param color_tag: 

500 which color tag should be used to display completions 

501 and their help messages for this group. 

502 

503 See :attr:`yuio.widget.Option.color_tag` for details. 

504 

505 """ 

506 

507 self._group_id += 1 

508 self._group_sorted = sorted 

509 self._group_color_tag = color_tag 

510 

511 @property 

512 def num_completions(self) -> int: 

513 """ 

514 Number of completions added so far. 

515 

516 """ 

517 

518 return len(self._completions) 

519 

520 def split_off_prefix(self, delim: str | None = None, /): 

521 """ 

522 Move everything up to the last occurrence of `delim` 

523 from :attr:`~CompletionCollector.prefix` 

524 to :attr:`~CompletionCollector.iprefix`. 

525 

526 :param delim: 

527 delimiter to split off; :data:`None` value splits off on any whitespace 

528 character, similar to :meth:`str.rsplit`. 

529 

530 """ 

531 

532 delim = delim or " " 

533 parts = self.prefix.rsplit(delim, maxsplit=1) 

534 if len(parts) > 1: 

535 self.iprefix += parts[0] + delim 

536 self.prefix = parts[1] 

537 

538 def split_off_suffix(self, delim: str | None = None, /): 

539 """ 

540 Move everything past the first occurrence of `delim` 

541 from :attr:`~CompletionCollector.suffix` 

542 to :attr:`~CompletionCollector.isuffix`. 

543 

544 :param delim: 

545 delimiter to split off; :data:`None` value splits off on any whitespace 

546 character, similar to :meth:`str.split`. 

547 

548 """ 

549 

550 delim = delim or " " 

551 parts = self.suffix.split(delim, maxsplit=1) 

552 if len(parts) > 1: 

553 self.suffix = parts[0] 

554 self.isuffix = delim + parts[1] + self.isuffix 

555 

556 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]: 

557 """ 

558 Finish collecting completions and return everything that was collected. 

559 

560 Do not reuse a collector after it was finalized. 

561 

562 :returns: 

563 list of completions, sorted by their groups and preferred ordering 

564 within each group. 

565 

566 If all completions start with a common prefix, a single completion 

567 is returned containing this prefix. 

568 

569 """ 

570 

571 if len(self._completions) > 1: 

572 c0 = self._completions[0] 

573 

574 iprefix = c0.iprefix 

575 isuffix = c0.isuffix 

576 

577 if ( 

578 derive_common_prefix 

579 and self.full_prefix.startswith(iprefix) 

580 and self.full_suffix.endswith(isuffix) 

581 and all( 

582 c.iprefix == iprefix and c.isuffix == isuffix 

583 for c in self._completions 

584 ) 

585 ): 

586 # If all completions have the same `iprefix` and `isuffix`... 

587 common_prefix = _commonprefix( 

588 list(c.completion for c in self._completions) 

589 ) 

590 if common_prefix and len(iprefix) + len(common_prefix) > len( 

591 self.iprefix 

592 ) + len(self.prefix): 

593 # ...and they have a common prefix that is longer than what's entered so far, 

594 # then complete this common prefix. 

595 rsuffix = "" 

596 rsymbols = "" 

597 if all( 

598 common_prefix == c.completion and rsuffix == c.rsuffix 

599 for c in self._completions 

600 ): 

601 # If completing common prefix actually fulfills a completion, add `rsuffix` as well. 

602 rsuffix = c0.rsuffix 

603 rsymbols = c0.rsymbols 

604 return [ 

605 Completion( 

606 iprefix=iprefix, 

607 completion=common_prefix, 

608 rsuffix=rsuffix, 

609 rsymbols=rsymbols, 

610 isuffix=isuffix, 

611 comment=None, 

612 dprefix="", 

613 dsuffix="", 

614 group_id=(0, 0), 

615 group_color_tag=None, 

616 ) 

617 ] 

618 

619 self._completions.sort() 

620 return self._completions 

621 

622 

623_MAX_COMPLETION_CORRECTIONS: int = 1 

624_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3 

625 

626 

627@_t.final 

628class _CorrectingCollector(CompletionCollector): 

629 def __init__(self, text: str, pos: int): 

630 super().__init__(text, pos) 

631 

632 self._has_corrections = False 

633 

634 def add( 

635 self, 

636 completion: str, 

637 /, 

638 *, 

639 comment: str | None = None, 

640 dprefix: str = "", 

641 dsuffix: str = "", 

642 color_tag: str | None = None, 

643 ): 

644 if not completion or completion in self.dedup_words: 

645 return 

646 

647 a = self.prefix + self.suffix 

648 b = completion 

649 corrections = _corrections(a, b) 

650 threshold = ( 

651 _MAX_COMPLETION_CORRECTIONS 

652 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2 

653 ) 

654 

655 if corrections <= 1: 

656 # this is a simple mistype, add it as usual 

657 self._add( 

658 completion, 

659 comment=comment, 

660 dprefix=dprefix, 

661 dsuffix=dsuffix, 

662 color_tag=color_tag, 

663 ) 

664 elif corrections <= threshold: 

665 # this is a correction, add it into corrections group 

666 if comment: 

667 comment = "corrected: " + comment 

668 else: 

669 comment = "corrected" 

670 with self.save_state(): 

671 self._group_id = 0xFFFFFFFE # (big enough) - 1 

672 self._group_color_tag = "corrected" 

673 self._add( 

674 completion, 

675 comment=comment, 

676 dprefix=dprefix, 

677 dsuffix=dsuffix, 

678 color_tag=color_tag, 

679 ) 

680 self._has_corrections = True 

681 

682 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]: 

683 if self._has_corrections: 

684 c0 = self._completions[0] 

685 

686 iprefix = "" 

687 prefix = self.full_prefix 

688 suffix = self.full_suffix 

689 isuffix = "" 

690 

691 if prefix.startswith(c0.iprefix): 

692 l = len(c0.iprefix) 

693 iprefix = prefix[:l] 

694 prefix = prefix[l:] 

695 

696 if suffix.endswith(c0.isuffix): 

697 l = len(c0.isuffix) 

698 isuffix = suffix[-l:] 

699 suffix = suffix[:-l] 

700 

701 # If we have corrections, add original value to the end. 

702 with self.save_state(): 

703 self._group_id = 0xFFFFFFFF # (big enough) 

704 self._group_color_tag = "original" 

705 self.iprefix = iprefix 

706 self.isuffix = isuffix 

707 self._add(prefix + suffix, comment="original") 

708 

709 self._completions.sort() 

710 return self._completions 

711 

712 

713def _corrections(a: str, b: str) -> float: 

714 # Damerau–Levenshtein distance (Optimal String Alignment distance) 

715 

716 a = a.casefold() 

717 b = b.casefold() 

718 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)] 

719 for i in range(len(a) + 1): 

720 d[i][0] = i 

721 for j in range(len(b) + 1): 

722 d[0][j] = j 

723 for i in range(1, len(a) + 1): 

724 for j in range(1, len(b) + 1): 

725 d[i][j] = min( 

726 # Add to `a`: 

727 d[i - 1][j] + 1, 

728 # Add to `b`: 

729 d[i][j - 1] + 1, 

730 # Replace: 

731 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]), 

732 # Transpose: 

733 ( 

734 d[i - 2][j - 2] + (a[i - 1] != b[j - 1]) 

735 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1] 

736 else math.inf 

737 ), 

738 ) 

739 

740 return d[-1][-1] 

741 

742 

743def _commonprefix(m: list[str]) -> str: 

744 if not m: 

745 return "" 

746 s1 = min(m) 

747 s2 = max(m) 

748 for i, c in enumerate(s1): 

749 if c != s2[i]: 

750 return s1[:i] 

751 return s1 

752 

753 

754class Completer(abc.ABC): 

755 """ 

756 An interface for text completion providers. 

757 

758 """ 

759 

760 def complete( 

761 self, 

762 text: str, 

763 pos: int, 

764 /, 

765 *, 

766 do_corrections: bool = True, 

767 derive_common_prefix: bool = True, 

768 ) -> list[Completion]: 

769 """ 

770 Complete the given text at the given cursor position. 

771 

772 :param text: 

773 text that is being completed. 

774 :param pos: 

775 position of the cursor in the text. ``0`` means the cursor 

776 is before the first character, ``len(text)`` means the cursor 

777 is after the last character. 

778 :param do_corrections: 

779 if :data:`True` (default), completion system will try to guess 

780 if there are any misspells in the `text`, and offer to correct them. 

781 :param derive_common_prefix: 

782 if :data:`True` (default), and all returned completions have a non-empty 

783 common prefix, return a single completion with this prefix instead. 

784 :returns: 

785 a sorted list of completions. 

786 

787 If all completions start with a common prefix, a single completion 

788 is returned containing this prefix. 

789 

790 """ 

791 

792 collector = CompletionCollector(text, pos) 

793 with collector.save_state(): 

794 self._process(collector) 

795 completions = collector.finalize() 

796 if completions or not do_corrections: 

797 return completions 

798 

799 collector = _CorrectingCollector(text, pos) 

800 with collector.save_state(): 

801 self._process(collector) 

802 return collector.finalize(derive_common_prefix=derive_common_prefix) 

803 

804 @abc.abstractmethod 

805 def _process(self, collector: CompletionCollector, /): 

806 """ 

807 Generate completions and add them to the given collector. 

808 

809 Implementing this class is straight forward, just feed all possible 

810 completions to the collector. For example, let's implement a completer 

811 for environment variables: 

812 

813 .. code-block:: python 

814 

815 class EnvVarCompleter(Completer): 

816 def _process(self, collector: CompletionCollector): 

817 for var in os.environ.keys(): 

818 collector.add(var) 

819 

820 """ 

821 

822 raise NotImplementedError() 

823 

824 def _get_completion_model( 

825 self, *, is_many: bool = False 

826 ) -> _OptionSerializer.Model: 

827 """ 

828 Internal, do not use. 

829 

830 """ 

831 

832 return _OptionSerializer.CustomCompleter(self) 

833 

834 

835class Empty(Completer): 

836 """ 

837 An empty completer that returns no values. 

838 

839 """ 

840 

841 def _process(self, collector: CompletionCollector): 

842 pass # nothing to do 

843 

844 def _get_completion_model( 

845 self, *, is_many: bool = False 

846 ) -> _OptionSerializer.Model: 

847 return _OptionSerializer.Model() 

848 

849 

850@dataclass(frozen=True, slots=True) 

851class Option: 

852 """ 

853 A single completion option for the :class:`Choice` completer. 

854 

855 """ 

856 

857 completion: str 

858 """ 

859 This string will replace an element that is being completed. 

860 

861 """ 

862 

863 comment: str | None = None 

864 """ 

865 Short comment displayed alongside the completion. 

866 

867 """ 

868 

869 

870class Choice(Completer): 

871 """ 

872 Completes input from a predefined list of completions. 

873 

874 :param choices: 

875 options to choose completion from. 

876 

877 """ 

878 

879 def __init__(self, choices: _t.Collection[Option], /): 

880 self._choices: _t.Collection[Option] = choices 

881 

882 def _process(self, collector: CompletionCollector, /): 

883 for choice in self._choices: 

884 collector.add(choice.completion, comment=choice.comment) 

885 

886 def _get_completion_model( 

887 self, *, is_many: bool = False 

888 ) -> _OptionSerializer.Model: 

889 if any(option.comment for option in self._choices): 

890 return _OptionSerializer.ChoiceWithDesc( 

891 [(option.completion, option.comment or "") for option in self._choices] 

892 ) 

893 else: 

894 return _OptionSerializer.Choice( 

895 [option.completion for option in self._choices] 

896 ) 

897 

898 

899class Alternative(Completer): 

900 """ 

901 Joins outputs from multiple completers. 

902 

903 :param completers: 

904 list of inner completers. 

905 

906 This is a list of tuples. First tuple element is a description of a completion 

907 group. It will be displayed when this completer is used in shells 

908 that support it (namely, ZSH). Second tuple element is the inner completer 

909 itself. 

910 

911 """ 

912 

913 def __init__(self, completers: list[tuple[str, Completer]], /): 

914 self._completers = completers 

915 

916 def _process(self, collector: CompletionCollector, /): 

917 for _, completer in self._completers: 

918 with collector.save_state(): 

919 collector.add_group() 

920 completer._process(collector) 

921 

922 def _get_completion_model( 

923 self, *, is_many: bool = False 

924 ) -> _OptionSerializer.Model: 

925 return _OptionSerializer.Alternative( 

926 [ 

927 (name, completer._get_completion_model(is_many=is_many)) 

928 for name, completer in self._completers 

929 ] 

930 ) 

931 

932 

933class List(Completer): 

934 """ 

935 Completes a value-separated list of elements. 

936 

937 :param inner: 

938 completer for list items. 

939 :param delimiter: 

940 a character that separates list items. :data:`None` separates by any whitespace 

941 character, similar to :meth:`str.split`. 

942 :param allow_duplicates: 

943 whether to show completions that already appear in the list. 

944 

945 """ 

946 

947 def __init__( 

948 self, 

949 inner: Completer, 

950 /, 

951 *, 

952 delimiter: str | None = None, 

953 allow_duplicates: bool = False, 

954 ): 

955 self._inner = inner 

956 if delimiter == "": 

957 raise ValueError("empty delimiter") 

958 self._delimiter = delimiter 

959 self._allow_duplicates = allow_duplicates 

960 

961 def _process(self, collector: CompletionCollector, /): 

962 collector.split_off_prefix(self._delimiter) 

963 collector.split_off_suffix(self._delimiter) 

964 collector.rsuffix = self._delimiter or " " 

965 collector.rsymbols += self._delimiter or string.whitespace 

966 

967 if not self._allow_duplicates: 

968 dedup_words = set( 

969 collector.iprefix.split(self._delimiter) 

970 + collector.isuffix.split(self._delimiter) 

971 ) 

972 if collector.text in dedup_words: 

973 dedup_words.remove(collector.text) 

974 collector.dedup_words = frozenset(dedup_words) 

975 else: 

976 collector.dedup_words = frozenset() 

977 

978 self._inner._process(collector) 

979 

980 def _get_completion_model( 

981 self, *, is_many: bool = False 

982 ) -> _OptionSerializer.Model: 

983 if is_many: 

984 return _OptionSerializer.ListMany( 

985 self._delimiter or " ", self._inner._get_completion_model() 

986 ) 

987 else: 

988 return _OptionSerializer.List( 

989 self._delimiter or " ", self._inner._get_completion_model() 

990 ) 

991 

992 

993class Tuple(Completer): 

994 """ 

995 Completes a value-separated tuple of elements. 

996 

997 :param inner: 

998 completers for each tuple element. 

999 :param delimiter: 

1000 a character that separates list items. :data:`None` separates by any whitespace 

1001 character, similar to :meth:`str.split`. 

1002 

1003 """ 

1004 

1005 def __init__(self, *inner: Completer, delimiter: str | None = None): 

1006 self._inner = inner 

1007 if delimiter == "": 

1008 raise ValueError("empty delimiter") 

1009 self._delimiter = delimiter 

1010 

1011 def _process(self, collector: CompletionCollector, /): 

1012 pos = len(collector.prefix.split(self._delimiter)) 

1013 if ( 

1014 pos 

1015 and self._delimiter is None 

1016 and collector.prefix 

1017 and collector.prefix[-1] in string.whitespace 

1018 ): 

1019 # `.split(None)` will trim whitespaces at the end. 

1020 # Make sure we count those towards the current position in the tuple. 

1021 pos += 1 

1022 if pos > len(self._inner): 

1023 return 

1024 if pos > 0: 

1025 pos -= 1 

1026 

1027 collector.split_off_prefix(self._delimiter) 

1028 collector.split_off_suffix(self._delimiter) 

1029 collector.rsuffix = self._delimiter or " " 

1030 collector.rsymbols += self._delimiter or string.whitespace 

1031 

1032 self._inner[pos]._process(collector) 

1033 

1034 def _get_completion_model( 

1035 self, *, is_many: bool = False 

1036 ) -> _OptionSerializer.Model: 

1037 if is_many: 

1038 return _OptionSerializer.TupleMany( 

1039 self._delimiter or " ", 

1040 [inner._get_completion_model() for inner in self._inner], 

1041 ) 

1042 else: 

1043 return _OptionSerializer.Tuple( 

1044 self._delimiter or " ", 

1045 [inner._get_completion_model() for inner in self._inner], 

1046 ) 

1047 

1048 

1049class File(Completer): 

1050 """ 

1051 Completes file paths. 

1052 

1053 :param extensions: 

1054 allowed file extensions, should include the leading dot. 

1055 

1056 """ 

1057 

1058 def __init__(self, extensions: str | _t.Collection[str] | None = None): 

1059 if isinstance(extensions, str): 

1060 self._extensions = [extensions] 

1061 elif extensions is not None: 

1062 self._extensions = list(extensions) 

1063 else: 

1064 self._extensions = None 

1065 

1066 def _process(self, collector: CompletionCollector, /): 

1067 base, name = os.path.split(collector.prefix) 

1068 if base and not base.endswith(os.path.sep): 

1069 base += os.path.sep 

1070 collector.iprefix += base 

1071 collector.prefix = name 

1072 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0] 

1073 resolved = pathlib.Path(base).expanduser().resolve() 

1074 rsuffix = collector.rsuffix 

1075 if resolved.is_dir(): 

1076 if name.startswith("."): 

1077 collector.rsuffix = "" 

1078 collector.add(os.path.curdir + os.path.sep, color_tag="dir") 

1079 collector.add(os.path.pardir + os.path.sep, color_tag="dir") 

1080 if name.startswith("~"): 

1081 collector.rsuffix = "" 

1082 collector.add("~" + os.path.sep, color_tag="dir") 

1083 try: 

1084 for path in resolved.iterdir(): 

1085 if path.is_dir(): 

1086 if path.is_symlink(): 

1087 color_tag = "symlink" 

1088 dsuffix = "@" 

1089 else: 

1090 color_tag = "dir" 

1091 dsuffix = "" 

1092 collector.rsuffix = "" 

1093 collector.add( 

1094 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix 

1095 ) 

1096 elif self._extensions is None or any( 

1097 path.name.endswith(ext) for ext in self._extensions 

1098 ): 

1099 collector.rsuffix = rsuffix 

1100 color_tag = None 

1101 dsuffix = "" 

1102 if path.is_symlink(): 

1103 color_tag = "symlink" 

1104 dsuffix = "@" 

1105 elif path.is_file(): 

1106 if (os.name != "nt" and os.access(path, os.X_OK)) or ( 

1107 os.name == "nt" and path.suffix == ".exe" 

1108 ): 

1109 color_tag = "exec" 

1110 dsuffix = "*" 

1111 else: 

1112 color_tag = "file" 

1113 elif path.is_socket(): 

1114 color_tag = "socket" 

1115 dsuffix = "=" 

1116 elif path.is_fifo(): 

1117 color_tag = "pipe" 

1118 dsuffix = "|" 

1119 elif path.is_block_device(): 

1120 color_tag = "block_device" 

1121 dsuffix = "#" 

1122 elif path.is_char_device(): 

1123 color_tag = "char_device" 

1124 dsuffix = "%" 

1125 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix) 

1126 except PermissionError: 

1127 return 

1128 

1129 def _get_completion_model( 

1130 self, *, is_many: bool = False 

1131 ) -> _OptionSerializer.Model: 

1132 return _OptionSerializer.File( 

1133 "|".join(extension.lstrip(".") for extension in self._extensions or []) 

1134 ) 

1135 

1136 

1137class Dir(File): 

1138 """ 

1139 Completes directories. 

1140 

1141 """ 

1142 

1143 def __init__(self): 

1144 super().__init__([]) 

1145 

1146 def _get_completion_model( 

1147 self, *, is_many: bool = False 

1148 ) -> _OptionSerializer.Model: 

1149 return _OptionSerializer.Dir() 

1150 

1151 

1152class _CustomCompleterRegistrar: 

1153 def __init__(self) -> None: 

1154 self._custom_completer_index = 0 

1155 

1156 def _register_custom_completer(self) -> int: 

1157 index = self._custom_completer_index 

1158 self._custom_completer_index += 1 

1159 return index 

1160 

1161 

1162class _OptionSerializer(_CustomCompleterRegistrar): 

1163 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ") 

1164 

1165 def __init__( 

1166 self, 

1167 flags: list[str], 

1168 path: str, 

1169 nargs: str | int, 

1170 metavar: str | tuple[str, ...], 

1171 help: str | yuio.Disabled, 

1172 ): 

1173 super().__init__() 

1174 

1175 self._flags = flags 

1176 self._path = path 

1177 self._nargs = nargs 

1178 self._metavar = metavar if isinstance(metavar, tuple) else (metavar,) 

1179 self._help: str | yuio.Disabled = help 

1180 

1181 def dump(self, model: _OptionSerializer.Model): 

1182 if self._help is yuio.DISABLED: 

1183 desc = "__yuio_hide__" 

1184 else: 

1185 desc = self._process_help(self._help) 

1186 

1187 compspec = [ 

1188 self._path, 

1189 " ".join(self._flags), 

1190 desc, 

1191 " ".join( 

1192 re.sub( 

1193 r"[\\ ]", 

1194 lambda s: "\\S" if s.group() == " " else "\\L", 

1195 str(m), 

1196 ) 

1197 or "" 

1198 for m in self._metavar 

1199 ), 

1200 str(self._nargs), 

1201 *model.dump(self), 

1202 ] 

1203 

1204 return "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec) 

1205 

1206 @staticmethod 

1207 def _process_help(help: str): 

1208 if ( 

1209 len(help) > 2 

1210 and help[0].isupper() 

1211 and (help[1].islower() or help[1].isspace()) 

1212 ): 

1213 help = help[0].lower() + help[1:] 

1214 if help.endswith(".") and not help.endswith(".."): 

1215 help = help[:-1] 

1216 if (index := help.find("\n\n")) != -1: 

1217 help = help[:index] 

1218 return yuio.string.strip_color_tags(help) 

1219 

1220 @staticmethod 

1221 def _dump_nested(compspec: _t.Iterable[object], s: _OptionSerializer) -> list[str]: 

1222 contents = [] 

1223 

1224 for item in compspec: 

1225 contents.extend(_OptionSerializer._dump_nested_item(item, s)) 

1226 

1227 return contents 

1228 

1229 @staticmethod 

1230 def _dump_nested_item(item: object, s: _OptionSerializer) -> list[str]: 

1231 contents = [] 

1232 

1233 if isinstance(item, _OptionSerializer.Model): 

1234 contents.extend(item.dump(s)) 

1235 elif isinstance(item, list): 

1236 contents.append(str(len(item))) 

1237 for sub_item in item: 

1238 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s)) 

1239 elif isinstance(item, tuple): 

1240 for sub_item in item: 

1241 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s)) 

1242 else: 

1243 contents.append(str(item)) 

1244 

1245 return contents 

1246 

1247 @dataclass 

1248 class Model: 

1249 tag: typing.ClassVar[str] = "-" 

1250 

1251 def __init_subclass__(cls, tag: str = "-", **kwargs): 

1252 super().__init_subclass__(**kwargs) 

1253 cls.tag = tag 

1254 

1255 def dump(self, s: _OptionSerializer) -> list[str]: 

1256 contents = _OptionSerializer._dump_nested( 

1257 (getattr(self, field.name) for field in dataclasses.fields(self)), s 

1258 ) 

1259 return [self.tag, str(len(contents)), *contents] 

1260 

1261 def get_completer_at_index( 

1262 self, s: _CustomCompleterRegistrar, index: int 

1263 ) -> Completer | None: 

1264 return None 

1265 

1266 @dataclass 

1267 class File(Model, tag="f"): 

1268 ext: str 

1269 

1270 @dataclass 

1271 class Dir(Model, tag="d"): 

1272 pass 

1273 

1274 @dataclass 

1275 class Choice(Model, tag="c"): 

1276 choices: list[str] 

1277 

1278 def dump(self, s: _OptionSerializer) -> list[str]: 

1279 return [self.tag, str(len(self.choices)), *self.choices] 

1280 

1281 @dataclass 

1282 class ChoiceWithDesc(Model, tag="cd"): 

1283 choices: list[tuple[str, str]] 

1284 

1285 def dump(self, s: _OptionSerializer) -> list[str]: 

1286 return [ 

1287 self.tag, 

1288 str(len(self.choices) * 2), 

1289 *[c[0] for c in self.choices], 

1290 *[s._process_help(c[1]) for c in self.choices], 

1291 ] 

1292 

1293 @dataclass 

1294 class Git(Model, tag="g"): 

1295 modes: str 

1296 

1297 @dataclass 

1298 class List(Model, tag="l"): 

1299 delim: str 

1300 inner: _OptionSerializer.Model 

1301 

1302 def get_completer_at_index( 

1303 self, s: _CustomCompleterRegistrar, index: int 

1304 ) -> Completer | None: 

1305 return self.inner.get_completer_at_index(s, index) 

1306 

1307 @dataclass 

1308 class ListMany(List, tag="lm"): 

1309 pass 

1310 

1311 @dataclass 

1312 class Tuple(Model, tag="t"): 

1313 delim: str 

1314 inner: list[_OptionSerializer.Model] 

1315 

1316 def get_completer_at_index( 

1317 self, s: _CustomCompleterRegistrar, index: int 

1318 ) -> Completer | None: 

1319 for inner in self.inner: 

1320 if completer := inner.get_completer_at_index(s, index): 

1321 return completer 

1322 return None 

1323 

1324 @dataclass 

1325 class TupleMany(Tuple, tag="tm"): 

1326 pass 

1327 

1328 @dataclass 

1329 class Alternative(Model, tag="a"): 

1330 alternatives: list[tuple[str, _OptionSerializer.Model]] 

1331 

1332 def get_completer_at_index( 

1333 self, s: _CustomCompleterRegistrar, index: int 

1334 ) -> Completer | None: 

1335 for _, inner in self.alternatives: 

1336 if completer := inner.get_completer_at_index(s, index): 

1337 return completer 

1338 return None 

1339 

1340 @dataclass 

1341 class CustomCompleter(Model, tag="cc"): 

1342 completer: Completer 

1343 

1344 def dump(self, s: _OptionSerializer) -> list[str]: 

1345 return [ 

1346 self.tag, 

1347 "1", 

1348 json.dumps( 

1349 { 

1350 "path": s._path, 

1351 "flags": s._flags, 

1352 "index": s._register_custom_completer(), 

1353 } 

1354 ), 

1355 ] 

1356 

1357 def get_completer_at_index( 

1358 self, s: _CustomCompleterRegistrar, index: int 

1359 ) -> Completer | None: 

1360 this_index = s._register_custom_completer() 

1361 if index == this_index: 

1362 return self.completer 

1363 else: 

1364 return None 

1365 

1366 

1367class _ProgramSerializer: 

1368 def __init__(self, path: str = "") -> None: 

1369 self._path = path 

1370 self._lines: list[str] = [] 

1371 self._positionals = 0 

1372 self._subcommands: dict[ 

1373 str, tuple[_ProgramSerializer, bool, str | yuio.Disabled] 

1374 ] = {} 

1375 

1376 def add_option( 

1377 self, 

1378 flags: list[str] | yuio.Positional, 

1379 nargs: str | int, 

1380 metavar: str | tuple[str, ...], 

1381 help: str | yuio.Disabled, 

1382 completer: Completer | None, 

1383 is_many: bool, 

1384 ): 

1385 if flags is yuio.POSITIONAL: 

1386 flags = [str(self._positionals)] 

1387 self._positionals += 1 

1388 if completer is None: 

1389 model = _OptionSerializer.Model() 

1390 else: 

1391 model = completer._get_completion_model(is_many=is_many) 

1392 self._add_option(flags, nargs, metavar, help, model) 

1393 

1394 def _add_option( 

1395 self, 

1396 flags: list[str], 

1397 nargs: str | int, 

1398 metavar: str | tuple[str, ...], 

1399 help: str | yuio.Disabled, 

1400 model: _OptionSerializer.Model, 

1401 ): 

1402 self._lines.append( 

1403 _OptionSerializer(flags, self._path, nargs, metavar, help).dump(model) 

1404 ) 

1405 

1406 def add_subcommand( 

1407 self, 

1408 name: str, 

1409 is_alias: bool, 

1410 help: str | yuio.Disabled, 

1411 ): 

1412 serializer = _ProgramSerializer(f"{self._path}/{name}") 

1413 self._subcommands[name] = (serializer, is_alias, help) 

1414 return serializer 

1415 

1416 def _dump(self): 

1417 if self._subcommands: 

1418 self._add_option( 

1419 ["c"], 

1420 1, 

1421 "<subcommand>", 

1422 "Subcommand.", 

1423 _OptionSerializer.ChoiceWithDesc( 

1424 [ 

1425 (name, help) 

1426 for name, (_, is_alias, help) in self._subcommands.items() 

1427 if not is_alias and help is not yuio.DISABLED 

1428 ] 

1429 ), 

1430 ) 

1431 

1432 for _, (serializer, _, _) in self._subcommands.items(): 

1433 self._lines.extend(serializer._dump()) 

1434 

1435 return self._lines 

1436 

1437 def dump(self): 

1438 return "\n".join(self._dump()) 

1439 

1440 

1441_PROG_ESCAPE = str.maketrans( 

1442 string.punctuation + string.whitespace, 

1443 "_" * (len(string.punctuation) + len(string.whitespace)), 

1444) 

1445 

1446 

1447def _run_completer_at_index(completer: Completer, is_many: bool, index: int, word: str): 

1448 registrar = _CustomCompleterRegistrar() 

1449 model = completer._get_completion_model(is_many=is_many) 

1450 completer_at_index = model.get_completer_at_index(registrar, index) 

1451 if completer_at_index: 

1452 # It's up to user's shell to do corrections and derive common prefix. 

1453 completions = completer.complete( 

1454 word, len(word), do_corrections=False, derive_common_prefix=False 

1455 ) 

1456 for completion in completions: 

1457 print( 

1458 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}", 

1459 file=sys.__stdout__, 

1460 ) 

1461 

1462 

1463def _write_completions(compdata: str, prog: str | None = None, shell: str = "all"): 

1464 import yuio.io 

1465 

1466 true_prog = prog or pathlib.Path(sys.argv[0]).stem 

1467 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE) 

1468 

1469 if pathlib.Path(sys.argv[0]).stem == "__main__": 

1470 yuio.io.failure( 

1471 "You've invoked this program as a python module, most likely with " 

1472 "`python -m <module>`. For completions to work, the program " 

1473 "must be invoked as a command in your `$PATH`" 

1474 ) 

1475 sys.exit(1) 

1476 if not prog: 

1477 yuio.io.failure("Failed to generate completion because program name is empty") 

1478 sys.exit(1) 

1479 if not re.match(r"^[a-zA-Z0-9_-]+$", prog): 

1480 yuio.io.failure( 

1481 "Failed to generate completion due to " 

1482 "forbidden characters in program name: `%r`", 

1483 prog, 

1484 ) 

1485 sys.exit(1) 

1486 

1487 if shell == "uninstall": 

1488 shell = "all" 

1489 yuio.io.heading("Uninstalling completions for `%s`", true_prog) 

1490 install = False 

1491 else: 

1492 yuio.io.heading("Generating completions for `%s`", true_prog) 

1493 install = True 

1494 

1495 if not shutil.which(true_prog): 

1496 yuio.io.warning( 

1497 "Program `%s` is not in your `$PATH`. Completions might not be able " 

1498 "to initialize", 

1499 true_prog, 

1500 ) 

1501 

1502 if os.name == "nt": 

1503 data_home = cache_home = config_home = pathlib.Path( 

1504 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local") 

1505 ) 

1506 else: 

1507 data_home = pathlib.Path( 

1508 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share") 

1509 ) 

1510 cache_home = pathlib.Path( 

1511 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache") 

1512 ) 

1513 config_home = pathlib.Path( 

1514 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config") 

1515 ) 

1516 

1517 compdata_path = data_home / f"yuio/{prog}.compdata.tsv" 

1518 

1519 if install: 

1520 task_heading = "Installing completions" 

1521 else: 

1522 task_heading = "Uninstalling completions" 

1523 

1524 with yuio.io.Task(task_heading) as t: 

1525 if install: 

1526 os.makedirs(data_home / "yuio", exist_ok=True) 

1527 compdata_path.write_text(compdata) 

1528 yuio.io.info( 

1529 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path 

1530 ) 

1531 elif compdata_path.exists(): 

1532 os.remove(compdata_path) 

1533 yuio.io.info("Removed <c path>%s</c>", compdata_path) 

1534 

1535 if shell in ["all", "bash"]: 

1536 t.comment("Bash") 

1537 _write_bash_script( 

1538 prog, 

1539 true_prog, 

1540 install, 

1541 compdata_path, 

1542 data_home, 

1543 cache_home, 

1544 config_home, 

1545 ) 

1546 if shell in ["all", "zsh"]: 

1547 t.comment("Zsh") 

1548 _write_zsh_script( 

1549 prog, 

1550 true_prog, 

1551 install, 

1552 compdata_path, 

1553 data_home, 

1554 cache_home, 

1555 config_home, 

1556 ) 

1557 if shell in ["all", "fish"]: 

1558 t.comment("Fish") 

1559 _write_fish_script( 

1560 prog, 

1561 true_prog, 

1562 install, 

1563 compdata_path, 

1564 data_home, 

1565 cache_home, 

1566 config_home, 

1567 ) 

1568 if shell in ["all", "pwsh"]: 

1569 t.comment("PowerShell") 

1570 _write_pwsh_script( 

1571 prog, 

1572 true_prog, 

1573 install, 

1574 compdata_path, 

1575 data_home, 

1576 cache_home, 

1577 config_home, 

1578 ) 

1579 

1580 yuio.io.success("All done! Please restart your shell for changes to take effect.") 

1581 if install: 

1582 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog) 

1583 

1584 

1585def _write_bash_script( 

1586 prog: str, 

1587 true_prog: str, 

1588 install: bool, 

1589 compdata_path: pathlib.Path, 

1590 data_home: pathlib.Path, 

1591 cache_home: pathlib.Path, 

1592 config_home: pathlib.Path, 

1593): 

1594 import yuio.exec 

1595 import yuio.io 

1596 

1597 if os.name == "nt": 

1598 yuio.io.warning( 

1599 "Skipped <c note>Bash</c>: completion script doesn't support windows" 

1600 ) 

1601 return 

1602 

1603 if install and not shutil.which("bash"): 

1604 yuio.io.warning("Skipped <c note>Bash</c>: `bash` command is not available") 

1605 return 

1606 

1607 try: 

1608 bash_completions_home = yuio.exec.exec( 

1609 "bash", 

1610 "-lc", 

1611 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"', 

1612 ).splitlines()[-1] 

1613 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1614 bash_completions_home = data_home / "bash-completion/completions/" 

1615 bash_completions_home = pathlib.Path(bash_completions_home) 

1616 script_dest = bash_completions_home / true_prog 

1617 

1618 if install: 

1619 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path) 

1620 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest) 

1621 elif script_dest.exists(): 

1622 os.remove(script_dest) 

1623 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1624 

1625 

1626def _write_zsh_script( 

1627 prog: str, 

1628 true_prog: str, 

1629 install: bool, 

1630 compdata_path: pathlib.Path, 

1631 data_home: pathlib.Path, 

1632 cache_home: pathlib.Path, 

1633 config_home: pathlib.Path, 

1634): 

1635 import yuio.exec 

1636 import yuio.io 

1637 

1638 if os.name == "nt": 

1639 yuio.io.warning( 

1640 "Skipped <c note>Zsh</c>: completion script doesn't support windows" 

1641 ) 

1642 return 

1643 

1644 if install and not shutil.which("zsh"): 

1645 yuio.io.warning("Skipped <c note>Zsh</c>: `zsh` command is not available") 

1646 return 

1647 

1648 needs_cache_cleanup = False 

1649 

1650 zsh_completions_home = data_home / "zsh/completions" 

1651 

1652 if not zsh_completions_home.exists(): 

1653 zsh_completions_home.mkdir(parents=True) 

1654 # Completions home needs rwxr-xr-x, otherwise zsh will not load 

1655 # our completion scripts. 

1656 zsh_completions_home.chmod(mode=0o755) 

1657 

1658 script_dest = zsh_completions_home / ("_" + true_prog) 

1659 

1660 if install: 

1661 needs_cache_cleanup = True 

1662 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path) 

1663 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest) 

1664 elif script_dest.exists(): 

1665 needs_cache_cleanup = True 

1666 

1667 os.remove(script_dest) 

1668 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1669 

1670 try: 

1671 fpath = ( 

1672 yuio.exec.exec( 

1673 "zsh", 

1674 "-lc", 

1675 "echo -n $FPATH", 

1676 ) 

1677 .splitlines()[-1] 

1678 .split(":") 

1679 ) 

1680 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1681 fpath = [] 

1682 

1683 try: 

1684 zhome = yuio.exec.exec( 

1685 "zsh", 

1686 "-lc", 

1687 "echo -n ${ZDOTDIR:-$HOME}", 

1688 ).splitlines()[-1] 

1689 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1690 zhome = pathlib.Path.home() 

1691 

1692 zhome = pathlib.Path(zhome) 

1693 zprofile_path = zhome / ".zprofile" 

1694 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n" 

1695 

1696 if install: 

1697 if str(zsh_completions_home) not in fpath: 

1698 with open(zprofile_path, "a") as f: 

1699 f.write(zprofile_append_text) 

1700 yuio.io.info( 

1701 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`", 

1702 zprofile_path, 

1703 zsh_completions_home, 

1704 ) 

1705 elif zprofile_path.exists(): 

1706 zprofile_text = zprofile_path.read_text() 

1707 if zprofile_append_text in zprofile_text: 

1708 yuio.io.info( 

1709 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1710 " because other completions might rely on them", 

1711 zprofile_path, 

1712 ) 

1713 

1714 if not needs_cache_cleanup: 

1715 return 

1716 

1717 # Try to remove completions cache from the most common places. 

1718 for zcomp_basedir in [zhome, cache_home / "prezto"]: 

1719 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir(): 

1720 continue 

1721 for file in zcomp_basedir.iterdir(): 

1722 if file.is_file() and re.match(r"^\.?zcompdump", file.name): 

1723 os.remove(file) 

1724 yuio.io.info( 

1725 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>", 

1726 file, 

1727 ) 

1728 

1729 try: 

1730 # Run zsh with the right flags in case zshrc runs compinit. 

1731 # If after generating completions user runs `zsh` without the `-l` flag, 

1732 # our changes to fpath will not be visible, and compinit will dump 

1733 # an invalid version of cache. To avoid this, we call zsh ourselves 

1734 # before the user has a chance to do it. Notice, though, that we don't 

1735 # run `compdump`. This is because we can't be sure that the user uses 

1736 # the default cache path (~/.zcompdump). 

1737 yuio.exec.exec("zsh", "-lc", "true") 

1738 except (subprocess.CalledProcessError, FileNotFoundError): 

1739 pass 

1740 

1741 

1742def _write_fish_script( 

1743 prog: str, 

1744 true_prog: str, 

1745 install: bool, 

1746 compdata_path: pathlib.Path, 

1747 data_home: pathlib.Path, 

1748 cache_home: pathlib.Path, 

1749 config_home: pathlib.Path, 

1750): 

1751 import yuio.io 

1752 

1753 if os.name == "nt": 

1754 yuio.io.warning( 

1755 "Skipped <c note>Fish</c>: completion script doesn't support windows" 

1756 ) 

1757 return 

1758 

1759 if install and not shutil.which("fish"): 

1760 yuio.io.warning("Skipped <c note>Fish</c>: `fish` command is not available") 

1761 return 

1762 

1763 fish_completions_home = data_home / "fish/vendor_completions.d" 

1764 script_dest = fish_completions_home / (true_prog + ".fish") 

1765 

1766 if install: 

1767 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path) 

1768 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest) 

1769 elif script_dest.exists(): 

1770 os.remove(script_dest) 

1771 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1772 

1773 

1774def _write_pwsh_script( 

1775 prog: str, 

1776 true_prog: str, 

1777 install: bool, 

1778 compdata_path: pathlib.Path, 

1779 data_home: pathlib.Path, 

1780 cache_home: pathlib.Path, 

1781 config_home: pathlib.Path, 

1782): 

1783 import yuio.exec 

1784 import yuio.io 

1785 

1786 if shutil.which("pwsh"): 

1787 command = "pwsh" 

1788 elif shutil.which("powershell"): 

1789 command = "powershell" 

1790 else: 

1791 yuio.io.warning( 

1792 "Skipped <c note>PowerShell</c>: `pwsh` command is not available" 

1793 ) 

1794 return 

1795 

1796 try: 

1797 pwsh_data = ( 

1798 yuio.exec.exec( 

1799 command, 

1800 "-Command", 

1801 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"', 

1802 ) 

1803 .splitlines()[-1] 

1804 .strip() 

1805 ) 

1806 except (subprocess.CalledProcessError, IndexError) as e: 

1807 yuio.io.warning( 

1808 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s", 

1809 e, 

1810 ) 

1811 return 

1812 except FileNotFoundError: 

1813 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found") 

1814 return 

1815 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data): 

1816 version = match.group(1) 

1817 profile_s = match.group(2) 

1818 else: 

1819 yuio.io.warning( 

1820 "Skipped <c note>PowerShell</c>: can't determine powershell version" 

1821 ) 

1822 return 

1823 if not profile_s: 

1824 yuio.io.warning( 

1825 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty" 

1826 ) 

1827 return 

1828 if tuple(int(v) for v in version.split(".")) < (5, 0, 0): 

1829 yuio.io.warning( 

1830 "Skipped <c note>PowerShell</c>: completions script requires " 

1831 "PowerShell 5 or newer, you have %s", 

1832 version, 

1833 ) 

1834 return 

1835 

1836 profile_path = pathlib.Path(profile_s).expanduser().resolve() 

1837 profile_path.parent.mkdir(exist_ok=True, parents=True) 

1838 

1839 data_dir = data_home / "yuio/pwsh" 

1840 loader_path = data_dir / "LoadCompletions.ps1" 

1841 script_dest = data_dir / f"_{true_prog}.ps1" 

1842 if install: 

1843 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path) 

1844 yuio.io.info( 

1845 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest 

1846 ) 

1847 _write_pwsh_loader(loader_path, data_dir) 

1848 elif script_dest.exists(): 

1849 os.remove(script_dest) 

1850 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1851 

1852 try: 

1853 data_dirs = [ 

1854 pathlib.Path(f).expanduser().resolve() 

1855 for f in yuio.exec.exec( 

1856 command, 

1857 "-Command", 

1858 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")', 

1859 ) 

1860 .strip() 

1861 .splitlines() 

1862 ] 

1863 except (subprocess.CalledProcessError, FileNotFoundError): 

1864 return 

1865 

1866 pwsh_profile_append_text = f"\n. {loader_path}\n" 

1867 

1868 if install: 

1869 if data_dir not in data_dirs: 

1870 with open(profile_path, "a") as f: 

1871 f.write(pwsh_profile_append_text) 

1872 yuio.io.info( 

1873 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup", 

1874 profile_path, 

1875 loader_path, 

1876 ) 

1877 elif profile_path.exists(): 

1878 pwsh_profile_text = profile_path.read_text() 

1879 if pwsh_profile_append_text in pwsh_profile_text: 

1880 yuio.io.info( 

1881 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1882 " because other completions might rely on them", 

1883 profile_path, 

1884 ) 

1885 

1886 

1887def _write_script( 

1888 path: pathlib.Path, 

1889 script_name: str, 

1890 prog: str, 

1891 true_prog: str, 

1892 compdata_path: pathlib.Path, 

1893): 

1894 script_template = _read_script(script_name) 

1895 script = ( 

1896 (script_template) 

1897 .replace("@prog@", prog) 

1898 .replace("@true_prog@", true_prog) 

1899 .replace("@data@", str(compdata_path)) 

1900 .replace("@version@", yuio.__version__) 

1901 ) 

1902 

1903 path.parent.mkdir(exist_ok=True, parents=True) 

1904 path.write_text(script) 

1905 path.chmod(0o755) 

1906 

1907 

1908def _read_script(script_name: str): 

1909 import zipfile 

1910 import zipimport 

1911 

1912 if isinstance(__loader__, zipimport.zipimporter): 

1913 # Yuio is imported directly from a wheel. 

1914 with zipfile.ZipFile(__loader__.archive) as archive: 

1915 script_template = archive.read("yuio/_complete/" + script_name) 

1916 return script_template.decode() 

1917 else: 

1918 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name 

1919 return script_template_path.read_text() 

1920 

1921 

1922def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path): 

1923 import yuio.io 

1924 

1925 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1" 

1926 loader_template = loader_template_path.read_text() 

1927 

1928 loader_version = re.search( 

1929 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE 

1930 ) 

1931 assert loader_version 

1932 

1933 if loader_path.exists() and loader_path.is_file(): 

1934 current_loader = loader_path.read_text() 

1935 current_version_s = re.search( 

1936 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE 

1937 ) 

1938 

1939 if current_version_s is None: 

1940 yuio.io.warning( 

1941 "<c note>Note:</c> can't determine version of <c path>%s</c>, " 

1942 "file will be overridden", 

1943 loader_path, 

1944 ) 

1945 elif int(loader_version.group(1)) <= int(current_version_s.group(1)): 

1946 return 

1947 

1948 loader_template = loader_template.replace("@data@", str(data_dir)) 

1949 loader_path.write_text(loader_template) 

1950 loader_path.chmod(0o755) 

1951 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)