Coverage for yuio / complete.py: 78%

753 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 15:42 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9This module provides autocompletion functionality for widgets and CLI. 

10 

11 

12Completer basics 

13---------------- 

14 

15All completers are derived from the :class:`Completer` base class 

16with a simple interface: 

17 

18.. autoclass:: Completer 

19 :members: 

20 

21.. autoclass:: Completion 

22 :members: 

23 

24 

25Completers 

26---------- 

27 

28Yuio provides basic completers that cover most of the cases: 

29 

30.. autoclass:: Empty 

31 

32.. autoclass:: Alternative 

33 

34.. autoclass:: Choice 

35 

36.. autoclass:: Option 

37 :members: 

38 

39.. autoclass:: List 

40 

41.. autoclass:: Tuple 

42 

43.. autoclass:: File 

44 

45.. autoclass:: Dir 

46 

47 

48Implementing your own completer 

49------------------------------- 

50 

51To implement a custom completer, subclass :class:`Completer` and implement 

52its :meth:`~Completer._process` method. 

53 

54.. note:: 

55 

56 When using a custom completer for CLI flags in :mod:`yuio.app`, 

57 completion script will invoke your program with special arguments 

58 to run the completer and get its result. 

59 

60.. class:: Completer 

61 :noindex: 

62 

63 .. automethod:: _process 

64 

65.. autoclass:: CompletionCollector 

66 

67""" 

68 

69from __future__ import annotations 

70 

71import abc 

72import contextlib 

73import dataclasses 

74import functools 

75import json 

76import math 

77import os 

78import pathlib 

79import re 

80import shutil 

81import string 

82import subprocess 

83import sys 

84from dataclasses import dataclass 

85 

86import yuio 

87import yuio.string 

88from yuio.util import commonprefix as _commonprefix 

89 

90import typing 

91import yuio._typing_ext as _tx 

92from typing import TYPE_CHECKING 

93 

94if TYPE_CHECKING: 

95 import typing_extensions as _t 

96else: 

97 from yuio import _typing as _t 

98 

99__all__ = [ 

100 "Alternative", 

101 "Choice", 

102 "Completer", 

103 "Completion", 

104 "CompletionCollector", 

105 "Dir", 

106 "Empty", 

107 "File", 

108 "List", 

109 "Option", 

110 "Tuple", 

111] 

112 

113 

114@dataclass(frozen=True, slots=True) 

115@functools.total_ordering 

116class Completion: 

117 """ 

118 A single completion. 

119 

120 """ 

121 

122 iprefix: str 

123 """ 

124 See :class:`CompletionCollector.iprefix` for details. 

125 

126 """ 

127 

128 completion: str 

129 """ 

130 Text of the completion. 

131 

132 """ 

133 

134 rsuffix: str 

135 """ 

136 See :class:`CompletionCollector.rsuffix` for details. 

137 

138 """ 

139 

140 rsymbols: str 

141 """ 

142 See :class:`CompletionCollector.rsymbols` for details. 

143 

144 """ 

145 

146 isuffix: str 

147 """ 

148 See :class:`CompletionCollector.isuffix` for details. 

149 

150 """ 

151 

152 comment: str | None 

153 """ 

154 Short comment displayed alongside the completion. 

155 

156 """ 

157 

158 dprefix: str 

159 """ 

160 Prefix that will be displayed before :attr:`~Completion.completion` 

161 when listing completions, but will not be inserted once completion 

162 is applied. 

163 

164 """ 

165 

166 dsuffix: str 

167 """ 

168 Like :attr:`~Completion.dprefix`, but it's a suffix. 

169 

170 """ 

171 

172 group_id: _tx.SupportsLt[_t.Any] = dataclasses.field(repr=False) 

173 """ 

174 Group id, used to sort completions. 

175 

176 Actual content of this property is an implementation detail. 

177 

178 """ 

179 

180 group_color_tag: str | None 

181 """ 

182 Color tag that's used when displaying this completion. 

183 

184 See :meth:`CompletionCollector.add_group` for details. 

185 

186 """ 

187 

188 def __lt__(self, other: Completion) -> bool: 

189 """ 

190 Completions are ordered by their groups and then alphabetically. 

191 

192 """ 

193 

194 return self.group_id < other.group_id or ( 

195 self.group_id == other.group_id and self.completion < other.completion 

196 ) 

197 

198 

199@dataclass(init=False, eq=False, repr=False, match_args=False) 

200class CompletionCollector: 

201 """ 

202 A class that collects completions as completers are running. 

203 

204 The text that is being completed is split into four parts, similar 

205 to what you might see in ZSH completion widgets. The main two are: 

206 

207 .. autoattribute:: prefix 

208 

209 .. autoattribute:: suffix 

210 

211 When completions are added to the collector, they are checked against 

212 the current prefix to determine if they match the entered text. If they 

213 do, the completion system will replace text from `prefix` and `suffix` 

214 with the new completion string. 

215 

216 The two additional parts are: 

217 

218 .. autoattribute:: iprefix 

219 

220 .. autoattribute:: isuffix 

221 

222 For example, suppose you're completing a second element 

223 of a colon-separated list. The list completer will set up 

224 the collector so that `prefix` and `suffix` contain parts of the 

225 current list element, while `iprefix` and `isuffix` contain 

226 the rest of the elements: 

227 

228 .. code-block:: text 

229 

230 list_element_1:list_el|ement_2:list_element_3 

231 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘ 

232 iprefix prefix │ suffix isuffix 

233 └ cursor 

234 

235 Now, if the completer adds a completion ``"list_elements"``, 

236 this text will replace the `prefix` and `suffix`, but not `iprefix` 

237 and `isuffix`. So, after the completion is applied, the string will 

238 look like so: 

239 

240 .. code-block:: text 

241 

242 list_element_1:list_elements:list_element_3 

243 └┬──────────┘ 

244 this got replaced 

245 

246 Finally, there is `rsuffix`: 

247 

248 .. autoattribute:: rsuffix 

249 

250 .. autoattribute:: rsymbols 

251 

252 So, when completing a colon-separated list, colons will be added and removed 

253 automatically, similar to how ZSH does it. 

254 

255 .. autoattribute:: dedup_words 

256 

257 .. autoattribute:: full_prefix 

258 

259 .. autoattribute:: full_suffix 

260 

261 .. autoattribute:: text 

262 

263 .. autoattribute:: num_completions 

264 

265 .. automethod:: add 

266 

267 .. automethod:: add_group 

268 

269 .. automethod:: save_state 

270 

271 .. automethod:: split_off_prefix 

272 

273 .. automethod:: split_off_suffix 

274 

275 .. automethod:: finalize 

276 

277 """ 

278 

279 iprefix: str 

280 """ 

281 Contains text that goes before the :attr:`~CompletionCollector.prefix`. 

282 

283 This prefix is not considered when checking whether a completion 

284 matches a text, and it is not replaced by the completion. It will also 

285 not be shown in the table of completions. 

286 

287 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix` 

288 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into 

289 list elements. 

290 

291 """ 

292 

293 prefix: str 

294 """ 

295 Portion of the completed text before the cursor. 

296 

297 """ 

298 

299 suffix: str 

300 """ 

301 Portion of the completed text after the cursor. 

302 

303 """ 

304 

305 rsuffix: str 

306 """ 

307 Starts empty, and may be set to hold a list separator. 

308 

309 This suffix will be added after the completion. However, it will be automatically 

310 removed if the user types one of :attr:`CompletionCollector.rsymbols`, 

311 or moves cursor, or alters input in some other way. 

312 

313 This property is mutable and can be changed by completers. 

314 

315 """ 

316 

317 rsymbols: str 

318 """ 

319 If user types one of the symbols from this string, 

320 :attr:`~.CompletionCollector.rsuffix` will be removed. 

321 

322 This property is mutable and can be changed by completers. 

323 

324 """ 

325 

326 isuffix: str 

327 """ 

328 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes. 

329 

330 """ 

331 

332 dedup_words: frozenset[str] 

333 """ 

334 Completions from this set will not be added. This is useful 

335 when completing lists of unique values. 

336 

337 This property is mutable and can be changed by completers. 

338 

339 """ 

340 

341 # Internal fields. 

342 _group_id: int 

343 _group_sorted: bool 

344 _group_color_tag: str | None 

345 

346 def __init__(self, text: str, pos: int, /): 

347 self.iprefix = "" 

348 self.prefix = text[:pos] 

349 self.suffix = text[pos:] 

350 self.rsuffix = "" 

351 self.rsymbols = "" 

352 self.isuffix = "" 

353 self.dedup_words = frozenset() 

354 

355 self._group_id = 0 

356 self._group_sorted = True 

357 self._group_color_tag = None 

358 

359 self._completions: list[Completion] = [] 

360 

361 @property 

362 def full_prefix(self) -> str: 

363 """ 

364 Portion of the final completed text that goes before the cursor. 

365 

366 """ 

367 

368 return self.iprefix + self.prefix 

369 

370 @property 

371 def full_suffix(self) -> str: 

372 """ 

373 Portion of the final completed text that goes after the cursor. 

374 

375 """ 

376 

377 return self.suffix + self.isuffix 

378 

379 @property 

380 def text(self) -> str: 

381 """ 

382 Portion of the text that is being autocompleted. 

383 

384 """ 

385 

386 return self.prefix + self.suffix 

387 

388 @contextlib.contextmanager 

389 def save_state(self): 

390 """ 

391 Save current state of the collector, i.e. prefixes, 

392 suffixes, etc., upon entering this context manager, 

393 then restore state upon exiting. 

394 

395 Use this context manager when you need to call nested 

396 completers more than once to prevent changes made in 

397 one nested completer bleeding out into another 

398 nested completer. 

399 

400 """ 

401 

402 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)} 

403 

404 try: 

405 yield 

406 finally: 

407 for name, value in state.items(): 

408 setattr(self, name, value) 

409 

410 def add( 

411 self, 

412 completion: str, 

413 /, 

414 *, 

415 comment: str | None = None, 

416 dprefix: str = "", 

417 dsuffix: str = "", 

418 color_tag: str | None = None, 

419 ): 

420 """ 

421 Add a new completion. 

422 

423 :param completion: 

424 completed text without :attr:`~CompletionCollector.iprefix` 

425 and :attr:`~CompletionCollector.isuffix`. This text will replace 

426 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`. 

427 :param comment: 

428 additional comment that will be displayed near the completion. 

429 :param color_tag: 

430 allows overriding color tag from the group. 

431 

432 """ 

433 

434 if ( 

435 completion 

436 and completion not in self.dedup_words 

437 and completion.startswith(self.prefix) 

438 ): 

439 self._add( 

440 completion, 

441 comment=comment, 

442 dprefix=dprefix, 

443 dsuffix=dsuffix, 

444 color_tag=color_tag, 

445 ) 

446 

447 def _add( 

448 self, 

449 completion: str, 

450 /, 

451 *, 

452 comment: str | None = None, 

453 dprefix: str = "", 

454 dsuffix: str = "", 

455 color_tag: str | None = None, 

456 ): 

457 if not self.isuffix or self.isuffix[0] in string.whitespace: 

458 # Only add `rsuffix` if we're at the end of an array element. 

459 # Don't add `rsuffix` if we're in the middle of an array, unless the array 

460 # is separated by spaces. 

461 rsuffix = self.rsuffix 

462 rsymbols = self.rsymbols 

463 else: 

464 rsuffix = "" 

465 rsymbols = "" 

466 

467 if self._group_sorted: 

468 group_id = (self._group_id, 0) 

469 else: 

470 group_id = (self._group_id, len(self._completions)) 

471 

472 if color_tag is None: 

473 color_tag = self._group_color_tag 

474 

475 self._completions.append( 

476 Completion( 

477 iprefix=self.iprefix, 

478 completion=completion, 

479 rsuffix=rsuffix, 

480 rsymbols=rsymbols, 

481 isuffix=self.isuffix, 

482 comment=comment, 

483 dprefix=dprefix, 

484 dsuffix=dsuffix, 

485 group_id=group_id, 

486 group_color_tag=color_tag, 

487 ) 

488 ) 

489 

490 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None): 

491 """ 

492 Add a new completions group. 

493 

494 All completions added after call to this method will be placed to the new group. 

495 They will be grouped together, and colored according to the group's color tag. 

496 

497 :param sorted: 

498 controls whether completions in the new group 

499 should be sorted lexicographically. 

500 :param color_tag: 

501 which color tag should be used to display completions 

502 and their help messages for this group. 

503 

504 See :attr:`yuio.widget.Option.color_tag` for details. 

505 

506 """ 

507 

508 self._group_id += 1 

509 self._group_sorted = sorted 

510 self._group_color_tag = color_tag 

511 

512 @property 

513 def num_completions(self) -> int: 

514 """ 

515 Number of completions added so far. 

516 

517 """ 

518 

519 return len(self._completions) 

520 

521 def split_off_prefix(self, delim: str | None = None, /): 

522 """ 

523 Move everything up to the last occurrence of `delim` 

524 from :attr:`~CompletionCollector.prefix` 

525 to :attr:`~CompletionCollector.iprefix`. 

526 

527 :param delim: 

528 delimiter to split off; :data:`None` value splits off on any whitespace 

529 character, similar to :meth:`str.rsplit`. 

530 

531 """ 

532 

533 delim = delim or " " 

534 parts = self.prefix.rsplit(delim, maxsplit=1) 

535 if len(parts) > 1: 

536 self.iprefix += parts[0] + delim 

537 self.prefix = parts[1] 

538 

539 def split_off_suffix(self, delim: str | None = None, /): 

540 """ 

541 Move everything past the first occurrence of `delim` 

542 from :attr:`~CompletionCollector.suffix` 

543 to :attr:`~CompletionCollector.isuffix`. 

544 

545 :param delim: 

546 delimiter to split off; :data:`None` value splits off on any whitespace 

547 character, similar to :meth:`str.split`. 

548 

549 """ 

550 

551 delim = delim or " " 

552 parts = self.suffix.split(delim, maxsplit=1) 

553 if len(parts) > 1: 

554 self.suffix = parts[0] 

555 self.isuffix = delim + parts[1] + self.isuffix 

556 

557 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]: 

558 """ 

559 Finish collecting completions and return everything that was collected. 

560 

561 Do not reuse a collector after it was finalized. 

562 

563 :returns: 

564 list of completions, sorted by their groups and preferred ordering 

565 within each group. 

566 

567 If all completions start with a common prefix, a single completion 

568 is returned containing this prefix. 

569 

570 """ 

571 

572 if len(self._completions) > 1: 

573 c0 = self._completions[0] 

574 

575 iprefix = c0.iprefix 

576 isuffix = c0.isuffix 

577 

578 if ( 

579 derive_common_prefix 

580 and self.full_prefix.startswith(iprefix) 

581 and self.full_suffix.endswith(isuffix) 

582 and all( 

583 c.iprefix == iprefix and c.isuffix == isuffix 

584 for c in self._completions 

585 ) 

586 ): 

587 # If all completions have the same `iprefix` and `isuffix`... 

588 common_prefix = _commonprefix( 

589 list(c.completion for c in self._completions) 

590 ) 

591 if common_prefix and len(iprefix) + len(common_prefix) > len( 

592 self.iprefix 

593 ) + len(self.prefix): 

594 # ...and they have a common prefix that is longer than what's entered so far, 

595 # then complete this common prefix. 

596 rsuffix = "" 

597 rsymbols = "" 

598 if all( 

599 common_prefix == c.completion and rsuffix == c.rsuffix 

600 for c in self._completions 

601 ): 

602 # If completing common prefix actually fulfills a completion, add `rsuffix` as well. 

603 rsuffix = c0.rsuffix 

604 rsymbols = c0.rsymbols 

605 return [ 

606 Completion( 

607 iprefix=iprefix, 

608 completion=common_prefix, 

609 rsuffix=rsuffix, 

610 rsymbols=rsymbols, 

611 isuffix=isuffix, 

612 comment=None, 

613 dprefix="", 

614 dsuffix="", 

615 group_id=(0, 0), 

616 group_color_tag=None, 

617 ) 

618 ] 

619 

620 self._completions.sort() 

621 return self._completions 

622 

623 

624_MAX_COMPLETION_CORRECTIONS: int = 1 

625_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3 

626 

627 

628@_t.final 

629class _CorrectingCollector(CompletionCollector): 

630 def __init__(self, text: str, pos: int): 

631 super().__init__(text, pos) 

632 

633 self._has_corrections = False 

634 

635 def add( 

636 self, 

637 completion: str, 

638 /, 

639 *, 

640 comment: str | None = None, 

641 dprefix: str = "", 

642 dsuffix: str = "", 

643 color_tag: str | None = None, 

644 ): 

645 if not completion or completion in self.dedup_words: 

646 return 

647 

648 a = self.prefix + self.suffix 

649 b = completion 

650 corrections = _corrections(a, b) 

651 threshold = ( 

652 _MAX_COMPLETION_CORRECTIONS 

653 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2 

654 ) 

655 

656 if corrections <= 1: 

657 # this is a simple mistype, add it as usual 

658 self._add( 

659 completion, 

660 comment=comment, 

661 dprefix=dprefix, 

662 dsuffix=dsuffix, 

663 color_tag=color_tag, 

664 ) 

665 elif corrections <= threshold: 

666 # this is a correction, add it into corrections group 

667 if comment: 

668 comment = "corrected: " + comment 

669 else: 

670 comment = "corrected" 

671 with self.save_state(): 

672 self._group_id = 0xFFFFFFFE # (big enough) - 1 

673 self._group_color_tag = "corrected" 

674 self._add( 

675 completion, 

676 comment=comment, 

677 dprefix=dprefix, 

678 dsuffix=dsuffix, 

679 color_tag=color_tag, 

680 ) 

681 self._has_corrections = True 

682 

683 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]: 

684 if self._has_corrections: 

685 c0 = self._completions[0] 

686 

687 iprefix = "" 

688 prefix = self.full_prefix 

689 suffix = self.full_suffix 

690 isuffix = "" 

691 

692 if prefix.startswith(c0.iprefix): 

693 l = len(c0.iprefix) 

694 iprefix = prefix[:l] 

695 prefix = prefix[l:] 

696 

697 if suffix.endswith(c0.isuffix): 

698 l = len(c0.isuffix) 

699 isuffix = suffix[-l:] 

700 suffix = suffix[:-l] 

701 

702 # If we have corrections, add original value to the end. 

703 with self.save_state(): 

704 self._group_id = 0xFFFFFFFF # (big enough) 

705 self._group_color_tag = "original" 

706 self.iprefix = iprefix 

707 self.isuffix = isuffix 

708 self._add(prefix + suffix, comment="original") 

709 

710 self._completions.sort() 

711 return self._completions 

712 

713 

714def _corrections(a: str, b: str) -> float: 

715 # Damerau–Levenshtein distance (Optimal String Alignment distance) 

716 

717 a = a.casefold() 

718 b = b.casefold() 

719 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)] 

720 for i in range(len(a) + 1): 

721 d[i][0] = i 

722 for j in range(len(b) + 1): 

723 d[0][j] = j 

724 for i in range(1, len(a) + 1): 

725 for j in range(1, len(b) + 1): 

726 d[i][j] = min( 

727 # Add to `a`: 

728 d[i - 1][j] + 1, 

729 # Add to `b`: 

730 d[i][j - 1] + 1, 

731 # Replace: 

732 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]), 

733 # Transpose: 

734 ( 

735 d[i - 2][j - 2] + (a[i - 1] != b[j - 1]) 

736 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1] 

737 else math.inf 

738 ), 

739 ) 

740 

741 return d[-1][-1] 

742 

743 

744class Completer(abc.ABC): 

745 """ 

746 An interface for text completion providers. 

747 

748 """ 

749 

750 def complete( 

751 self, 

752 text: str, 

753 pos: int, 

754 /, 

755 *, 

756 do_corrections: bool = True, 

757 derive_common_prefix: bool = True, 

758 ) -> list[Completion]: 

759 """ 

760 Complete the given text at the given cursor position. 

761 

762 :param text: 

763 text that is being completed. 

764 :param pos: 

765 position of the cursor in the text. ``0`` means the cursor 

766 is before the first character, ``len(text)`` means the cursor 

767 is after the last character. 

768 :param do_corrections: 

769 if :data:`True` (default), completion system will try to guess 

770 if there are any misspells in the `text`, and offer to correct them. 

771 :param derive_common_prefix: 

772 if :data:`True` (default), and all returned completions have a non-empty 

773 common prefix, return a single completion with this prefix instead. 

774 :returns: 

775 a sorted list of completions. 

776 

777 If all completions start with a common prefix, a single completion 

778 is returned containing this prefix. 

779 

780 """ 

781 

782 collector = CompletionCollector(text, pos) 

783 with collector.save_state(): 

784 self._process(collector) 

785 completions = collector.finalize() 

786 if completions or not do_corrections: 

787 return completions 

788 

789 collector = _CorrectingCollector(text, pos) 

790 with collector.save_state(): 

791 self._process(collector) 

792 return collector.finalize(derive_common_prefix=derive_common_prefix) 

793 

794 @abc.abstractmethod 

795 def _process(self, collector: CompletionCollector, /): 

796 """ 

797 Generate completions and add them to the given collector. 

798 

799 Implementing this class is straight forward, just feed all possible 

800 completions to the collector. For example, let's implement a completer 

801 for environment variables: 

802 

803 .. code-block:: python 

804 

805 class EnvVarCompleter(Completer): 

806 def _process(self, collector: CompletionCollector): 

807 for var in os.environ.keys(): 

808 collector.add(var) 

809 

810 """ 

811 

812 raise NotImplementedError() 

813 

814 def _get_completion_model( 

815 self, *, is_many: bool = False 

816 ) -> _OptionSerializer.Model: 

817 """ 

818 Internal, do not use. 

819 

820 """ 

821 

822 return _OptionSerializer.CustomCompleter(self) 

823 

824 

825class Empty(Completer): 

826 """ 

827 An empty completer that returns no values. 

828 

829 """ 

830 

831 def _process(self, collector: CompletionCollector): 

832 pass # nothing to do 

833 

834 def _get_completion_model( 

835 self, *, is_many: bool = False 

836 ) -> _OptionSerializer.Model: 

837 return _OptionSerializer.Model() 

838 

839 

840@dataclass(frozen=True, slots=True) 

841class Option: 

842 """ 

843 A single completion option for the :class:`Choice` completer. 

844 

845 """ 

846 

847 completion: str 

848 """ 

849 This string will replace an element that is being completed. 

850 

851 """ 

852 

853 comment: str | None = None 

854 """ 

855 Short comment displayed alongside the completion. 

856 

857 """ 

858 

859 

860class Choice(Completer): 

861 """ 

862 Completes input from a predefined list of completions. 

863 

864 :param choices: 

865 options to choose completion from. 

866 

867 """ 

868 

869 def __init__(self, choices: _t.Collection[Option], /): 

870 self._choices: _t.Collection[Option] = choices 

871 

872 def _process(self, collector: CompletionCollector, /): 

873 for choice in self._choices: 

874 collector.add(choice.completion, comment=choice.comment) 

875 

876 def _get_completion_model( 

877 self, *, is_many: bool = False 

878 ) -> _OptionSerializer.Model: 

879 if any(option.comment for option in self._choices): 

880 return _OptionSerializer.ChoiceWithDesc( 

881 [(option.completion, option.comment or "") for option in self._choices] 

882 ) 

883 else: 

884 return _OptionSerializer.Choice( 

885 [option.completion for option in self._choices] 

886 ) 

887 

888 

889class Alternative(Completer): 

890 """ 

891 Joins outputs from multiple completers. 

892 

893 :param completers: 

894 list of inner completers. 

895 

896 This is a list of tuples. First tuple element is a description of a completion 

897 group. It will be displayed when this completer is used in shells 

898 that support it (namely, ZSH). Second tuple element is the inner completer 

899 itself. 

900 

901 """ 

902 

903 def __init__(self, completers: list[tuple[str, Completer]], /): 

904 self._completers = completers 

905 

906 def _process(self, collector: CompletionCollector, /): 

907 for _, completer in self._completers: 

908 with collector.save_state(): 

909 collector.add_group() 

910 completer._process(collector) 

911 

912 def _get_completion_model( 

913 self, *, is_many: bool = False 

914 ) -> _OptionSerializer.Model: 

915 return _OptionSerializer.Alternative( 

916 [ 

917 (name, completer._get_completion_model(is_many=is_many)) 

918 for name, completer in self._completers 

919 ] 

920 ) 

921 

922 

923class List(Completer): 

924 """ 

925 Completes a value-separated list of elements. 

926 

927 :param inner: 

928 completer for list items. 

929 :param delimiter: 

930 a character that separates list items. :data:`None` separates by any whitespace 

931 character, similar to :meth:`str.split`. 

932 :param allow_duplicates: 

933 whether to show completions that already appear in the list. 

934 

935 """ 

936 

937 def __init__( 

938 self, 

939 inner: Completer, 

940 /, 

941 *, 

942 delimiter: str | None = None, 

943 allow_duplicates: bool = False, 

944 ): 

945 self._inner = inner 

946 if delimiter == "": 

947 raise ValueError("empty delimiter") 

948 self._delimiter = delimiter 

949 self._allow_duplicates = allow_duplicates 

950 

951 def _process(self, collector: CompletionCollector, /): 

952 collector.split_off_prefix(self._delimiter) 

953 collector.split_off_suffix(self._delimiter) 

954 collector.rsuffix = self._delimiter or " " 

955 collector.rsymbols += self._delimiter or string.whitespace 

956 

957 if not self._allow_duplicates: 

958 dedup_words = set( 

959 collector.iprefix.split(self._delimiter) 

960 + collector.isuffix.split(self._delimiter) 

961 ) 

962 if collector.text in dedup_words: 

963 dedup_words.remove(collector.text) 

964 collector.dedup_words = frozenset(dedup_words) 

965 else: 

966 collector.dedup_words = frozenset() 

967 

968 self._inner._process(collector) 

969 

970 def _get_completion_model( 

971 self, *, is_many: bool = False 

972 ) -> _OptionSerializer.Model: 

973 if is_many: 

974 return _OptionSerializer.ListMany( 

975 self._delimiter or " ", self._inner._get_completion_model() 

976 ) 

977 else: 

978 return _OptionSerializer.List( 

979 self._delimiter or " ", self._inner._get_completion_model() 

980 ) 

981 

982 

983class Tuple(Completer): 

984 """ 

985 Completes a value-separated tuple of elements. 

986 

987 :param inner: 

988 completers for each tuple element. 

989 :param delimiter: 

990 a character that separates list items. :data:`None` separates by any whitespace 

991 character, similar to :meth:`str.split`. 

992 

993 """ 

994 

995 def __init__(self, *inner: Completer, delimiter: str | None = None): 

996 self._inner = inner 

997 if delimiter == "": 

998 raise ValueError("empty delimiter") 

999 self._delimiter = delimiter 

1000 

1001 def _process(self, collector: CompletionCollector, /): 

1002 pos = len(collector.prefix.split(self._delimiter)) 

1003 if ( 

1004 pos 

1005 and self._delimiter is None 

1006 and collector.prefix 

1007 and collector.prefix[-1] in string.whitespace 

1008 ): 

1009 # `.split(None)` will trim whitespaces at the end. 

1010 # Make sure we count those towards the current position in the tuple. 

1011 pos += 1 

1012 if pos > len(self._inner): 

1013 return 

1014 if pos > 0: 

1015 pos -= 1 

1016 

1017 collector.split_off_prefix(self._delimiter) 

1018 collector.split_off_suffix(self._delimiter) 

1019 collector.rsuffix = self._delimiter or " " 

1020 collector.rsymbols += self._delimiter or string.whitespace 

1021 

1022 self._inner[pos]._process(collector) 

1023 

1024 def _get_completion_model( 

1025 self, *, is_many: bool = False 

1026 ) -> _OptionSerializer.Model: 

1027 if is_many: 

1028 return _OptionSerializer.TupleMany( 

1029 self._delimiter or " ", 

1030 [inner._get_completion_model() for inner in self._inner], 

1031 ) 

1032 else: 

1033 return _OptionSerializer.Tuple( 

1034 self._delimiter or " ", 

1035 [inner._get_completion_model() for inner in self._inner], 

1036 ) 

1037 

1038 

1039class File(Completer): 

1040 """ 

1041 Completes file paths. 

1042 

1043 :param extensions: 

1044 allowed file extensions, should include the leading dot. 

1045 

1046 """ 

1047 

1048 def __init__(self, extensions: str | _t.Collection[str] | None = None): 

1049 if isinstance(extensions, str): 

1050 self._extensions = [extensions] 

1051 elif extensions is not None: 

1052 self._extensions = list(extensions) 

1053 else: 

1054 self._extensions = None 

1055 

1056 def _process(self, collector: CompletionCollector, /): 

1057 base, name = os.path.split(collector.prefix) 

1058 if base and not base.endswith(os.path.sep): 

1059 base += os.path.sep 

1060 collector.iprefix += base 

1061 collector.prefix = name 

1062 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0] 

1063 resolved = pathlib.Path(base).expanduser().resolve() 

1064 rsuffix = collector.rsuffix 

1065 if resolved.is_dir(): 

1066 if name.startswith("."): 

1067 collector.rsuffix = "" 

1068 collector.add(os.path.curdir + os.path.sep, color_tag="dir") 

1069 collector.add(os.path.pardir + os.path.sep, color_tag="dir") 

1070 if name.startswith("~"): 

1071 collector.rsuffix = "" 

1072 collector.add("~" + os.path.sep, color_tag="dir") 

1073 try: 

1074 for path in resolved.iterdir(): 

1075 if path.is_dir(): 

1076 if path.is_symlink(): 

1077 color_tag = "symlink" 

1078 dsuffix = "@" 

1079 else: 

1080 color_tag = "dir" 

1081 dsuffix = "" 

1082 collector.rsuffix = "" 

1083 collector.add( 

1084 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix 

1085 ) 

1086 elif self._extensions is None or any( 

1087 path.name.endswith(ext) for ext in self._extensions 

1088 ): 

1089 collector.rsuffix = rsuffix 

1090 color_tag = None 

1091 dsuffix = "" 

1092 if path.is_symlink(): 

1093 color_tag = "symlink" 

1094 dsuffix = "@" 

1095 elif path.is_file(): 

1096 if (os.name != "nt" and os.access(path, os.X_OK)) or ( 

1097 os.name == "nt" and path.suffix == ".exe" 

1098 ): 

1099 color_tag = "exec" 

1100 dsuffix = "*" 

1101 else: 

1102 color_tag = "file" 

1103 elif path.is_socket(): 

1104 color_tag = "socket" 

1105 dsuffix = "=" 

1106 elif path.is_fifo(): 

1107 color_tag = "pipe" 

1108 dsuffix = "|" 

1109 elif path.is_block_device(): 

1110 color_tag = "block_device" 

1111 dsuffix = "#" 

1112 elif path.is_char_device(): 

1113 color_tag = "char_device" 

1114 dsuffix = "%" 

1115 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix) 

1116 except PermissionError: 

1117 return 

1118 

1119 def _get_completion_model( 

1120 self, *, is_many: bool = False 

1121 ) -> _OptionSerializer.Model: 

1122 return _OptionSerializer.File( 

1123 "|".join(extension.lstrip(".") for extension in self._extensions or []) 

1124 ) 

1125 

1126 

1127class Dir(File): 

1128 """ 

1129 Completes directories. 

1130 

1131 """ 

1132 

1133 def __init__(self): 

1134 super().__init__([]) 

1135 

1136 def _get_completion_model( 

1137 self, *, is_many: bool = False 

1138 ) -> _OptionSerializer.Model: 

1139 return _OptionSerializer.Dir() 

1140 

1141 

1142class _CustomCompleterRegistrar: 

1143 def __init__(self) -> None: 

1144 self._custom_completer_index = 0 

1145 

1146 def _register_custom_completer(self) -> int: 

1147 index = self._custom_completer_index 

1148 self._custom_completer_index += 1 

1149 return index 

1150 

1151 

1152class _OptionSerializer(_CustomCompleterRegistrar): 

1153 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ") 

1154 

1155 def __init__( 

1156 self, 

1157 flags: list[str], 

1158 path: str, 

1159 nargs: str | int, 

1160 metavar: str | tuple[str, ...], 

1161 help: str | yuio.Disabled, 

1162 ): 

1163 super().__init__() 

1164 

1165 self._flags = flags 

1166 self._path = path 

1167 self._nargs = nargs 

1168 self._metavar = metavar if isinstance(metavar, tuple) else (metavar,) 

1169 self._help: str | yuio.Disabled = help 

1170 

1171 def dump(self, model: _OptionSerializer.Model): 

1172 if self._help is yuio.DISABLED: 

1173 desc = "__yuio_hide__" 

1174 else: 

1175 desc = self._process_help(self._help) 

1176 

1177 compspec = [ 

1178 self._path, 

1179 " ".join(self._flags), 

1180 desc, 

1181 " ".join( 

1182 re.sub( 

1183 r"[\\ ]", 

1184 lambda s: "\\S" if s.group() == " " else "\\L", 

1185 str(m), 

1186 ) 

1187 or "" 

1188 for m in self._metavar 

1189 ), 

1190 str(self._nargs), 

1191 *model.dump(self), 

1192 ] 

1193 

1194 return "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec) 

1195 

1196 @staticmethod 

1197 def _process_help(help: str): 

1198 if ( 

1199 len(help) > 2 

1200 and help[0].isupper() 

1201 and (help[1].islower() or help[1].isspace()) 

1202 ): 

1203 help = help[0].lower() + help[1:] 

1204 if help.endswith(".") and not help.endswith(".."): 

1205 help = help[:-1] 

1206 if (index := help.find("\n\n")) != -1: 

1207 help = help[:index] 

1208 return yuio.string.strip_color_tags(help) 

1209 

1210 @staticmethod 

1211 def _dump_nested(compspec: _t.Iterable[object], s: _OptionSerializer) -> list[str]: 

1212 contents = [] 

1213 

1214 for item in compspec: 

1215 contents.extend(_OptionSerializer._dump_nested_item(item, s)) 

1216 

1217 return contents 

1218 

1219 @staticmethod 

1220 def _dump_nested_item(item: object, s: _OptionSerializer) -> list[str]: 

1221 contents = [] 

1222 

1223 if isinstance(item, _OptionSerializer.Model): 

1224 contents.extend(item.dump(s)) 

1225 elif isinstance(item, list): 

1226 contents.append(str(len(item))) 

1227 for sub_item in item: 

1228 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s)) 

1229 elif isinstance(item, tuple): 

1230 for sub_item in item: 

1231 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s)) 

1232 else: 

1233 contents.append(str(item)) 

1234 

1235 return contents 

1236 

1237 @dataclass 

1238 class Model: 

1239 tag: typing.ClassVar[str] = "-" 

1240 

1241 def __init_subclass__(cls, tag: str = "-", **kwargs): 

1242 super().__init_subclass__(**kwargs) 

1243 cls.tag = tag 

1244 

1245 def dump(self, s: _OptionSerializer) -> list[str]: 

1246 contents = _OptionSerializer._dump_nested( 

1247 (getattr(self, field.name) for field in dataclasses.fields(self)), s 

1248 ) 

1249 return [self.tag, str(len(contents)), *contents] 

1250 

1251 def get_completer_at_index( 

1252 self, s: _CustomCompleterRegistrar, index: int 

1253 ) -> Completer | None: 

1254 return None 

1255 

1256 @dataclass 

1257 class File(Model, tag="f"): 

1258 ext: str 

1259 

1260 @dataclass 

1261 class Dir(Model, tag="d"): 

1262 pass 

1263 

1264 @dataclass 

1265 class Choice(Model, tag="c"): 

1266 choices: list[str] 

1267 

1268 def dump(self, s: _OptionSerializer) -> list[str]: 

1269 return [self.tag, str(len(self.choices)), *self.choices] 

1270 

1271 @dataclass 

1272 class ChoiceWithDesc(Model, tag="cd"): 

1273 choices: list[tuple[str, str]] 

1274 

1275 def dump(self, s: _OptionSerializer) -> list[str]: 

1276 return [ 

1277 self.tag, 

1278 str(len(self.choices) * 2), 

1279 *[c[0] for c in self.choices], 

1280 *[s._process_help(c[1]) for c in self.choices], 

1281 ] 

1282 

1283 @dataclass 

1284 class Git(Model, tag="g"): 

1285 modes: str 

1286 

1287 @dataclass 

1288 class List(Model, tag="l"): 

1289 delim: str 

1290 inner: _OptionSerializer.Model 

1291 

1292 def get_completer_at_index( 

1293 self, s: _CustomCompleterRegistrar, index: int 

1294 ) -> Completer | None: 

1295 return self.inner.get_completer_at_index(s, index) 

1296 

1297 @dataclass 

1298 class ListMany(List, tag="lm"): 

1299 pass 

1300 

1301 @dataclass 

1302 class Tuple(Model, tag="t"): 

1303 delim: str 

1304 inner: list[_OptionSerializer.Model] 

1305 

1306 def get_completer_at_index( 

1307 self, s: _CustomCompleterRegistrar, index: int 

1308 ) -> Completer | None: 

1309 for inner in self.inner: 

1310 if completer := inner.get_completer_at_index(s, index): 

1311 return completer 

1312 return None 

1313 

1314 @dataclass 

1315 class TupleMany(Tuple, tag="tm"): 

1316 pass 

1317 

1318 @dataclass 

1319 class Alternative(Model, tag="a"): 

1320 alternatives: list[tuple[str, _OptionSerializer.Model]] 

1321 

1322 def get_completer_at_index( 

1323 self, s: _CustomCompleterRegistrar, index: int 

1324 ) -> Completer | None: 

1325 for _, inner in self.alternatives: 

1326 if completer := inner.get_completer_at_index(s, index): 

1327 return completer 

1328 return None 

1329 

1330 @dataclass 

1331 class CustomCompleter(Model, tag="cc"): 

1332 completer: Completer 

1333 

1334 def dump(self, s: _OptionSerializer) -> list[str]: 

1335 return [ 

1336 self.tag, 

1337 "1", 

1338 json.dumps( 

1339 { 

1340 "path": s._path, 

1341 "flags": s._flags, 

1342 "index": s._register_custom_completer(), 

1343 } 

1344 ), 

1345 ] 

1346 

1347 def get_completer_at_index( 

1348 self, s: _CustomCompleterRegistrar, index: int 

1349 ) -> Completer | None: 

1350 this_index = s._register_custom_completer() 

1351 if index == this_index: 

1352 return self.completer 

1353 else: 

1354 return None 

1355 

1356 

1357class _ProgramSerializer: 

1358 def __init__(self, path: str = "") -> None: 

1359 self._path = path 

1360 self._lines: list[str] = [] 

1361 self._positionals = 0 

1362 self._subcommands: dict[ 

1363 str, tuple[_ProgramSerializer, bool, str | yuio.Disabled] 

1364 ] = {} 

1365 

1366 def add_option( 

1367 self, 

1368 flags: list[str] | yuio.Positional, 

1369 nargs: str | int, 

1370 metavar: str | tuple[str, ...], 

1371 help: str | yuio.Disabled, 

1372 completer: Completer | None, 

1373 is_many: bool, 

1374 ): 

1375 if flags is yuio.POSITIONAL: 

1376 flags = [str(self._positionals)] 

1377 self._positionals += 1 

1378 if completer is None: 

1379 model = _OptionSerializer.Model() 

1380 else: 

1381 model = completer._get_completion_model(is_many=is_many) 

1382 self._add_option(flags, nargs, metavar, help, model) 

1383 

1384 def _add_option( 

1385 self, 

1386 flags: list[str], 

1387 nargs: str | int, 

1388 metavar: str | tuple[str, ...], 

1389 help: str | yuio.Disabled, 

1390 model: _OptionSerializer.Model, 

1391 ): 

1392 self._lines.append( 

1393 _OptionSerializer(flags, self._path, nargs, metavar, help).dump(model) 

1394 ) 

1395 

1396 def add_subcommand( 

1397 self, 

1398 name: str, 

1399 is_alias: bool, 

1400 help: str | yuio.Disabled, 

1401 ): 

1402 serializer = _ProgramSerializer(f"{self._path}/{name}") 

1403 self._subcommands[name] = (serializer, is_alias, help) 

1404 return serializer 

1405 

1406 def _dump(self): 

1407 if self._subcommands: 

1408 self._add_option( 

1409 ["c"], 

1410 1, 

1411 "<subcommand>", 

1412 "Subcommand.", 

1413 _OptionSerializer.ChoiceWithDesc( 

1414 [ 

1415 (name, help) 

1416 for name, (_, is_alias, help) in self._subcommands.items() 

1417 if not is_alias and help is not yuio.DISABLED 

1418 ] 

1419 ), 

1420 ) 

1421 

1422 for _, (serializer, _, _) in self._subcommands.items(): 

1423 self._lines.extend(serializer._dump()) 

1424 

1425 return self._lines 

1426 

1427 def dump(self): 

1428 return "\n".join(self._dump()) 

1429 

1430 

1431_PROG_ESCAPE = str.maketrans( 

1432 string.punctuation + string.whitespace, 

1433 "_" * (len(string.punctuation) + len(string.whitespace)), 

1434) 

1435 

1436 

1437def _run_completer_at_index(completer: Completer, is_many: bool, index: int, word: str): 

1438 registrar = _CustomCompleterRegistrar() 

1439 model = completer._get_completion_model(is_many=is_many) 

1440 completer_at_index = model.get_completer_at_index(registrar, index) 

1441 if completer_at_index: 

1442 # It's up to user's shell to do corrections and derive common prefix. 

1443 completions = completer.complete( 

1444 word, len(word), do_corrections=False, derive_common_prefix=False 

1445 ) 

1446 for completion in completions: 

1447 print( 

1448 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}", 

1449 file=sys.__stdout__, 

1450 ) 

1451 

1452 

1453def _write_completions(compdata: str, prog: str | None = None, shell: str = "all"): 

1454 import yuio.io 

1455 

1456 true_prog = prog or pathlib.Path(sys.argv[0]).stem 

1457 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE) 

1458 

1459 if pathlib.Path(sys.argv[0]).stem == "__main__": 

1460 yuio.io.failure( 

1461 "You've invoked this program as a python module, most likely with " 

1462 "`python -m <module>`. For completions to work, the program " 

1463 "must be invoked as a command in your `$PATH`" 

1464 ) 

1465 sys.exit(1) 

1466 if not prog: 

1467 yuio.io.failure("Failed to generate completion because program name is empty") 

1468 sys.exit(1) 

1469 if not re.match(r"^[a-zA-Z0-9_-]+$", prog): 

1470 yuio.io.failure( 

1471 "Failed to generate completion due to " 

1472 "forbidden characters in program name: `%r`", 

1473 prog, 

1474 ) 

1475 sys.exit(1) 

1476 

1477 if shell == "uninstall": 

1478 shell = "all" 

1479 yuio.io.heading("Uninstalling completions for `%s`", true_prog) 

1480 install = False 

1481 else: 

1482 yuio.io.heading("Generating completions for `%s`", true_prog) 

1483 install = True 

1484 

1485 if not shutil.which(true_prog): 

1486 yuio.io.warning( 

1487 "Program `%s` is not in your `$PATH`. Completions might not be able " 

1488 "to initialize", 

1489 true_prog, 

1490 ) 

1491 

1492 if os.name == "nt": 

1493 data_home = cache_home = config_home = pathlib.Path( 

1494 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local") 

1495 ) 

1496 else: 

1497 data_home = pathlib.Path( 

1498 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share") 

1499 ) 

1500 cache_home = pathlib.Path( 

1501 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache") 

1502 ) 

1503 config_home = pathlib.Path( 

1504 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config") 

1505 ) 

1506 

1507 compdata_path = data_home / f"yuio/{prog}.compdata.tsv" 

1508 

1509 if install: 

1510 task_heading = "Installing completions" 

1511 else: 

1512 task_heading = "Uninstalling completions" 

1513 

1514 with yuio.io.Task(task_heading) as t: 

1515 if install: 

1516 os.makedirs(data_home / "yuio", exist_ok=True) 

1517 compdata_path.write_text(compdata) 

1518 yuio.io.info( 

1519 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path 

1520 ) 

1521 elif compdata_path.exists(): 

1522 os.remove(compdata_path) 

1523 yuio.io.info("Removed <c path>%s</c>", compdata_path) 

1524 

1525 if shell in ["all", "bash"]: 

1526 t.comment("Bash") 

1527 _write_bash_script( 

1528 prog, 

1529 true_prog, 

1530 install, 

1531 compdata_path, 

1532 data_home, 

1533 cache_home, 

1534 config_home, 

1535 ) 

1536 if shell in ["all", "zsh"]: 

1537 t.comment("Zsh") 

1538 _write_zsh_script( 

1539 prog, 

1540 true_prog, 

1541 install, 

1542 compdata_path, 

1543 data_home, 

1544 cache_home, 

1545 config_home, 

1546 ) 

1547 if shell in ["all", "fish"]: 

1548 t.comment("Fish") 

1549 _write_fish_script( 

1550 prog, 

1551 true_prog, 

1552 install, 

1553 compdata_path, 

1554 data_home, 

1555 cache_home, 

1556 config_home, 

1557 ) 

1558 if shell in ["all", "pwsh"]: 

1559 t.comment("PowerShell") 

1560 _write_pwsh_script( 

1561 prog, 

1562 true_prog, 

1563 install, 

1564 compdata_path, 

1565 data_home, 

1566 cache_home, 

1567 config_home, 

1568 ) 

1569 

1570 yuio.io.success("All done! Please restart your shell for changes to take effect.") 

1571 if install: 

1572 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog) 

1573 

1574 

1575def _write_bash_script( 

1576 prog: str, 

1577 true_prog: str, 

1578 install: bool, 

1579 compdata_path: pathlib.Path, 

1580 data_home: pathlib.Path, 

1581 cache_home: pathlib.Path, 

1582 config_home: pathlib.Path, 

1583): 

1584 import yuio.exec 

1585 import yuio.io 

1586 

1587 if os.name == "nt": 

1588 yuio.io.warning( 

1589 "Skipped <c note>Bash</c>: completion script doesn't support windows" 

1590 ) 

1591 return 

1592 

1593 if install and not shutil.which("bash"): 

1594 yuio.io.warning("Skipped <c note>Bash</c>: `bash` command is not available") 

1595 return 

1596 

1597 try: 

1598 bash_completions_home = yuio.exec.exec( 

1599 "bash", 

1600 "-lc", 

1601 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"', 

1602 ).splitlines()[-1] 

1603 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1604 bash_completions_home = data_home / "bash-completion/completions/" 

1605 bash_completions_home = pathlib.Path(bash_completions_home) 

1606 script_dest = bash_completions_home / true_prog 

1607 

1608 if install: 

1609 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path) 

1610 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest) 

1611 elif script_dest.exists(): 

1612 os.remove(script_dest) 

1613 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1614 

1615 

1616def _write_zsh_script( 

1617 prog: str, 

1618 true_prog: str, 

1619 install: bool, 

1620 compdata_path: pathlib.Path, 

1621 data_home: pathlib.Path, 

1622 cache_home: pathlib.Path, 

1623 config_home: pathlib.Path, 

1624): 

1625 import yuio.exec 

1626 import yuio.io 

1627 

1628 if os.name == "nt": 

1629 yuio.io.warning( 

1630 "Skipped <c note>Zsh</c>: completion script doesn't support windows" 

1631 ) 

1632 return 

1633 

1634 if install and not shutil.which("zsh"): 

1635 yuio.io.warning("Skipped <c note>Zsh</c>: `zsh` command is not available") 

1636 return 

1637 

1638 needs_cache_cleanup = False 

1639 

1640 zsh_completions_home = data_home / "zsh/completions" 

1641 

1642 if not zsh_completions_home.exists(): 

1643 zsh_completions_home.mkdir(parents=True) 

1644 # Completions home needs rwxr-xr-x, otherwise zsh will not load 

1645 # our completion scripts. 

1646 zsh_completions_home.chmod(mode=0o755) 

1647 

1648 script_dest = zsh_completions_home / ("_" + true_prog) 

1649 

1650 if install: 

1651 needs_cache_cleanup = True 

1652 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path) 

1653 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest) 

1654 elif script_dest.exists(): 

1655 needs_cache_cleanup = True 

1656 

1657 os.remove(script_dest) 

1658 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1659 

1660 try: 

1661 fpath = ( 

1662 yuio.exec.exec( 

1663 "zsh", 

1664 "-lc", 

1665 "echo -n $FPATH", 

1666 ) 

1667 .splitlines()[-1] 

1668 .split(":") 

1669 ) 

1670 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1671 fpath = [] 

1672 

1673 try: 

1674 zhome = yuio.exec.exec( 

1675 "zsh", 

1676 "-lc", 

1677 "echo -n ${ZDOTDIR:-$HOME}", 

1678 ).splitlines()[-1] 

1679 except (subprocess.CalledProcessError, IndexError, FileNotFoundError): 

1680 zhome = pathlib.Path.home() 

1681 

1682 zhome = pathlib.Path(zhome) 

1683 zprofile_path = zhome / ".zprofile" 

1684 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n" 

1685 

1686 if install: 

1687 if str(zsh_completions_home) not in fpath: 

1688 with open(zprofile_path, "a") as f: 

1689 f.write(zprofile_append_text) 

1690 yuio.io.info( 

1691 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`", 

1692 zprofile_path, 

1693 zsh_completions_home, 

1694 ) 

1695 elif zprofile_path.exists(): 

1696 zprofile_text = zprofile_path.read_text() 

1697 if zprofile_append_text in zprofile_text: 

1698 yuio.io.info( 

1699 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1700 " because other completions might rely on them", 

1701 zprofile_path, 

1702 ) 

1703 

1704 if not needs_cache_cleanup: 

1705 return 

1706 

1707 # Try to remove completions cache from the most common places. 

1708 for zcomp_basedir in [zhome, cache_home / "prezto"]: 

1709 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir(): 

1710 continue 

1711 for file in zcomp_basedir.iterdir(): 

1712 if file.is_file() and re.match(r"^\.?zcompdump", file.name): 

1713 os.remove(file) 

1714 yuio.io.info( 

1715 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>", 

1716 file, 

1717 ) 

1718 

1719 try: 

1720 # Run zsh with the right flags in case zshrc runs compinit. 

1721 # If after generating completions user runs `zsh` without the `-l` flag, 

1722 # our changes to fpath will not be visible, and compinit will dump 

1723 # an invalid version of cache. To avoid this, we call zsh ourselves 

1724 # before the user has a chance to do it. Notice, though, that we don't 

1725 # run `compdump`. This is because we can't be sure that the user uses 

1726 # the default cache path (~/.zcompdump). 

1727 yuio.exec.exec("zsh", "-lc", "true") 

1728 except (subprocess.CalledProcessError, FileNotFoundError): 

1729 pass 

1730 

1731 

1732def _write_fish_script( 

1733 prog: str, 

1734 true_prog: str, 

1735 install: bool, 

1736 compdata_path: pathlib.Path, 

1737 data_home: pathlib.Path, 

1738 cache_home: pathlib.Path, 

1739 config_home: pathlib.Path, 

1740): 

1741 import yuio.io 

1742 

1743 if os.name == "nt": 

1744 yuio.io.warning( 

1745 "Skipped <c note>Fish</c>: completion script doesn't support windows" 

1746 ) 

1747 return 

1748 

1749 if install and not shutil.which("fish"): 

1750 yuio.io.warning("Skipped <c note>Fish</c>: `fish` command is not available") 

1751 return 

1752 

1753 fish_completions_home = data_home / "fish/vendor_completions.d" 

1754 script_dest = fish_completions_home / (true_prog + ".fish") 

1755 

1756 if install: 

1757 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path) 

1758 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest) 

1759 elif script_dest.exists(): 

1760 os.remove(script_dest) 

1761 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1762 

1763 

1764def _write_pwsh_script( 

1765 prog: str, 

1766 true_prog: str, 

1767 install: bool, 

1768 compdata_path: pathlib.Path, 

1769 data_home: pathlib.Path, 

1770 cache_home: pathlib.Path, 

1771 config_home: pathlib.Path, 

1772): 

1773 import yuio.exec 

1774 import yuio.io 

1775 

1776 if shutil.which("pwsh"): 

1777 command = "pwsh" 

1778 elif shutil.which("powershell"): 

1779 command = "powershell" 

1780 else: 

1781 yuio.io.warning( 

1782 "Skipped <c note>PowerShell</c>: `pwsh` command is not available" 

1783 ) 

1784 return 

1785 

1786 try: 

1787 pwsh_data = ( 

1788 yuio.exec.exec( 

1789 command, 

1790 "-Command", 

1791 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"', 

1792 ) 

1793 .splitlines()[-1] 

1794 .strip() 

1795 ) 

1796 except (subprocess.CalledProcessError, IndexError) as e: 

1797 yuio.io.warning( 

1798 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s", 

1799 e, 

1800 ) 

1801 return 

1802 except FileNotFoundError: 

1803 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found") 

1804 return 

1805 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data): 

1806 version = match.group(1) 

1807 profile_s = match.group(2) 

1808 else: 

1809 yuio.io.warning( 

1810 "Skipped <c note>PowerShell</c>: can't determine powershell version" 

1811 ) 

1812 return 

1813 if not profile_s: 

1814 yuio.io.warning( 

1815 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty" 

1816 ) 

1817 return 

1818 if tuple(int(v) for v in version.split(".")) < (5, 0, 0): 

1819 yuio.io.warning( 

1820 "Skipped <c note>PowerShell</c>: completions script requires " 

1821 "PowerShell 5 or newer, you have %s", 

1822 version, 

1823 ) 

1824 return 

1825 

1826 profile_path = pathlib.Path(profile_s).expanduser().resolve() 

1827 profile_path.parent.mkdir(exist_ok=True, parents=True) 

1828 

1829 data_dir = data_home / "yuio/pwsh" 

1830 loader_path = data_dir / "LoadCompletions.ps1" 

1831 script_dest = data_dir / f"_{true_prog}.ps1" 

1832 if install: 

1833 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path) 

1834 yuio.io.info( 

1835 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest 

1836 ) 

1837 _write_pwsh_loader(loader_path, data_dir) 

1838 elif script_dest.exists(): 

1839 os.remove(script_dest) 

1840 yuio.io.info("Removed <c path>%s</c>", script_dest) 

1841 

1842 try: 

1843 data_dirs = [ 

1844 pathlib.Path(f).expanduser().resolve() 

1845 for f in yuio.exec.exec( 

1846 command, 

1847 "-Command", 

1848 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")', 

1849 ) 

1850 .strip() 

1851 .splitlines() 

1852 ] 

1853 except (subprocess.CalledProcessError, FileNotFoundError): 

1854 return 

1855 

1856 pwsh_profile_append_text = f"\n. {loader_path}\n" 

1857 

1858 if install: 

1859 if data_dir not in data_dirs: 

1860 with open(profile_path, "a") as f: 

1861 f.write(pwsh_profile_append_text) 

1862 yuio.io.info( 

1863 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup", 

1864 profile_path, 

1865 loader_path, 

1866 ) 

1867 elif profile_path.exists(): 

1868 pwsh_profile_text = profile_path.read_text() 

1869 if pwsh_profile_append_text in pwsh_profile_text: 

1870 yuio.io.info( 

1871 "<c note>Note:</c> modifications to <c path>%s</c> are not removed" 

1872 " because other completions might rely on them", 

1873 profile_path, 

1874 ) 

1875 

1876 

1877def _write_script( 

1878 path: pathlib.Path, 

1879 script_name: str, 

1880 prog: str, 

1881 true_prog: str, 

1882 compdata_path: pathlib.Path, 

1883): 

1884 script_template = _read_script(script_name) 

1885 script = ( 

1886 (script_template) 

1887 .replace("@prog@", prog) 

1888 .replace("@true_prog@", true_prog) 

1889 .replace("@data@", str(compdata_path)) 

1890 .replace("@version@", yuio.__version__) 

1891 ) 

1892 

1893 path.parent.mkdir(exist_ok=True, parents=True) 

1894 path.write_text(script) 

1895 path.chmod(0o755) 

1896 

1897 

1898def _read_script(script_name: str): 

1899 import zipfile 

1900 import zipimport 

1901 

1902 if isinstance(__loader__, zipimport.zipimporter): 

1903 # Yuio is imported directly from a wheel. 

1904 with zipfile.ZipFile(__loader__.archive) as archive: 

1905 script_template = archive.read("yuio/_complete/" + script_name) 

1906 return script_template.decode() 

1907 else: 

1908 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name 

1909 return script_template_path.read_text() 

1910 

1911 

1912def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path): 

1913 import yuio.io 

1914 

1915 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1" 

1916 loader_template = loader_template_path.read_text() 

1917 

1918 loader_version = re.search( 

1919 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE 

1920 ) 

1921 assert loader_version 

1922 

1923 if loader_path.exists() and loader_path.is_file(): 

1924 current_loader = loader_path.read_text() 

1925 current_version_s = re.search( 

1926 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE 

1927 ) 

1928 

1929 if current_version_s is None: 

1930 yuio.io.warning( 

1931 "<c note>Note:</c> can't determine version of <c path>%s</c>, " 

1932 "file will be overridden", 

1933 loader_path, 

1934 ) 

1935 elif int(loader_version.group(1)) <= int(current_version_s.group(1)): 

1936 return 

1937 

1938 loader_template = loader_template.replace("@data@", str(data_dir)) 

1939 loader_path.write_text(loader_template) 

1940 loader_path.chmod(0o755) 

1941 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)