Coverage for yuio / complete.py: 78%
761 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9This module provides autocompletion functionality for widgets and CLI.
12Completer basics
13----------------
15All completers are derived from the :class:`Completer` base class
16with a simple interface:
18.. autoclass:: Completer
19 :members:
21.. autoclass:: Completion
22 :members:
25Completers
26----------
28Yuio provides basic completers that cover most of the cases:
30.. autoclass:: Empty
32.. autoclass:: Alternative
34.. autoclass:: Choice
36.. autoclass:: Option
37 :members:
39.. autoclass:: List
41.. autoclass:: Tuple
43.. autoclass:: File
45.. autoclass:: Dir
48Implementing your own completer
49-------------------------------
51To implement a custom completer, subclass :class:`Completer` and implement
52its :meth:`~Completer._process` method.
54.. note::
56 When using a custom completer for CLI flags in :mod:`yuio.app`,
57 completion script will invoke your program with special arguments
58 to run the completer and get its result.
60.. class:: Completer
61 :noindex:
63 .. automethod:: _process
65.. autoclass:: CompletionCollector
67"""
69from __future__ import annotations
71import abc
72import contextlib
73import dataclasses
74import functools
75import json
76import math
77import os
78import pathlib
79import re
80import shutil
81import string
82import subprocess
83import sys
84from dataclasses import dataclass
86import yuio
87import yuio.string
89import typing
90import yuio._typing_ext as _tx
91from typing import TYPE_CHECKING
93if TYPE_CHECKING:
94 import typing_extensions as _t
95else:
96 from yuio import _typing as _t
98__all__ = [
99 "Alternative",
100 "Choice",
101 "Completer",
102 "Completion",
103 "CompletionCollector",
104 "Dir",
105 "Empty",
106 "File",
107 "List",
108 "Option",
109 "Tuple",
110]
113@dataclass(frozen=True, slots=True)
114@functools.total_ordering
115class Completion:
116 """
117 A single completion.
119 """
121 iprefix: str
122 """
123 See :class:`CompletionCollector.iprefix` for details.
125 """
127 completion: str
128 """
129 Text of the completion.
131 """
133 rsuffix: str
134 """
135 See :class:`CompletionCollector.rsuffix` for details.
137 """
139 rsymbols: str
140 """
141 See :class:`CompletionCollector.rsymbols` for details.
143 """
145 isuffix: str
146 """
147 See :class:`CompletionCollector.isuffix` for details.
149 """
151 comment: str | None
152 """
153 Short comment displayed alongside the completion.
155 """
157 dprefix: str
158 """
159 Prefix that will be displayed before :attr:`~Completion.completion`
160 when listing completions, but will not be inserted once completion
161 is applied.
163 """
165 dsuffix: str
166 """
167 Like :attr:`~Completion.dprefix`, but it's a suffix.
169 """
171 group_id: _tx.SupportsLt[_t.Any] = dataclasses.field(repr=False)
172 """
173 Group id, used to sort completions.
175 Actual content of this property is an implementation detail.
177 """
179 group_color_tag: str | None
180 """
181 Color tag that's used when displaying this completion.
183 See :meth:`CompletionCollector.add_group` for details.
185 """
187 def __lt__(self, other: Completion) -> bool:
188 """
189 Completions are ordered by their groups and then alphabetically.
191 """
193 return self.group_id < other.group_id or (
194 self.group_id == other.group_id and self.completion < other.completion
195 )
198@dataclass(init=False, eq=False, repr=False, match_args=False)
199class CompletionCollector:
200 """
201 A class that collects completions as completers are running.
203 The text that is being completed is split into four parts, similar
204 to what you might see in ZSH completion widgets. The main two are:
206 .. autoattribute:: prefix
208 .. autoattribute:: suffix
210 When completions are added to the collector, they are checked against
211 the current prefix to determine if they match the entered text. If they
212 do, the completion system will replace text from `prefix` and `suffix`
213 with the new completion string.
215 The two additional parts are:
217 .. autoattribute:: iprefix
219 .. autoattribute:: isuffix
221 For example, suppose you're completing a second element
222 of a colon-separated list. The list completer will set up
223 the collector so that `prefix` and `suffix` contain parts of the
224 current list element, while `iprefix` and `isuffix` contain
225 the rest of the elements:
227 .. code-block:: text
229 list_element_1:list_el|ement_2:list_element_3
230 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘
231 iprefix prefix │ suffix isuffix
232 └ cursor
234 Now, if the completer adds a completion ``"list_elements"``,
235 this text will replace the `prefix` and `suffix`, but not `iprefix`
236 and `isuffix`. So, after the completion is applied, the string will
237 look like so:
239 .. code-block:: text
241 list_element_1:list_elements:list_element_3
242 └┬──────────┘
243 this got replaced
245 Finally, there is `rsuffix`:
247 .. autoattribute:: rsuffix
249 .. autoattribute:: rsymbols
251 So, when completing a colon-separated list, colons will be added and removed
252 automatically, similar to how ZSH does it.
254 .. autoattribute:: dedup_words
256 .. autoattribute:: full_prefix
258 .. autoattribute:: full_suffix
260 .. autoattribute:: text
262 .. autoattribute:: num_completions
264 .. automethod:: add
266 .. automethod:: add_group
268 .. automethod:: save_state
270 .. automethod:: split_off_prefix
272 .. automethod:: split_off_suffix
274 .. automethod:: finalize
276 """
278 iprefix: str
279 """
280 Contains text that goes before the :attr:`~CompletionCollector.prefix`.
282 This prefix is not considered when checking whether a completion
283 matches a text, and it is not replaced by the completion. It will also
284 not be shown in the table of completions.
286 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix`
287 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into
288 list elements.
290 """
292 prefix: str
293 """
294 Portion of the completed text before the cursor.
296 """
298 suffix: str
299 """
300 Portion of the completed text after the cursor.
302 """
304 rsuffix: str
305 """
306 Starts empty, and may be set to hold a list separator.
308 This suffix will be added after the completion. However, it will be automatically
309 removed if the user types one of :attr:`CompletionCollector.rsymbols`,
310 or moves cursor, or alters input in some other way.
312 This property is mutable and can be changed by completers.
314 """
316 rsymbols: str
317 """
318 If user types one of the symbols from this string,
319 :attr:`~.CompletionCollector.rsuffix` will be removed.
321 This property is mutable and can be changed by completers.
323 """
325 isuffix: str
326 """
327 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes.
329 """
331 dedup_words: frozenset[str]
332 """
333 Completions from this set will not be added. This is useful
334 when completing lists of unique values.
336 This property is mutable and can be changed by completers.
338 """
340 # Internal fields.
341 _group_id: int
342 _group_sorted: bool
343 _group_color_tag: str | None
345 def __init__(self, text: str, pos: int, /):
346 self.iprefix = ""
347 self.prefix = text[:pos]
348 self.suffix = text[pos:]
349 self.rsuffix = ""
350 self.rsymbols = ""
351 self.isuffix = ""
352 self.dedup_words = frozenset()
354 self._group_id = 0
355 self._group_sorted = True
356 self._group_color_tag = None
358 self._completions: list[Completion] = []
360 @property
361 def full_prefix(self) -> str:
362 """
363 Portion of the final completed text that goes before the cursor.
365 """
367 return self.iprefix + self.prefix
369 @property
370 def full_suffix(self) -> str:
371 """
372 Portion of the final completed text that goes after the cursor.
374 """
376 return self.suffix + self.isuffix
378 @property
379 def text(self) -> str:
380 """
381 Portion of the text that is being autocompleted.
383 """
385 return self.prefix + self.suffix
387 @contextlib.contextmanager
388 def save_state(self):
389 """
390 Save current state of the collector, i.e. prefixes,
391 suffixes, etc., upon entering this context manager,
392 then restore state upon exiting.
394 Use this context manager when you need to call nested
395 completers more than once to prevent changes made in
396 one nested completer bleeding out into another
397 nested completer.
399 """
401 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)}
403 try:
404 yield
405 finally:
406 for name, value in state.items():
407 setattr(self, name, value)
409 def add(
410 self,
411 completion: str,
412 /,
413 *,
414 comment: str | None = None,
415 dprefix: str = "",
416 dsuffix: str = "",
417 color_tag: str | None = None,
418 ):
419 """
420 Add a new completion.
422 :param completion:
423 completed text without :attr:`~CompletionCollector.iprefix`
424 and :attr:`~CompletionCollector.isuffix`. This text will replace
425 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`.
426 :param comment:
427 additional comment that will be displayed near the completion.
428 :param color_tag:
429 allows overriding color tag from the group.
431 """
433 if (
434 completion
435 and completion not in self.dedup_words
436 and completion.startswith(self.prefix)
437 ):
438 self._add(
439 completion,
440 comment=comment,
441 dprefix=dprefix,
442 dsuffix=dsuffix,
443 color_tag=color_tag,
444 )
446 def _add(
447 self,
448 completion: str,
449 /,
450 *,
451 comment: str | None = None,
452 dprefix: str = "",
453 dsuffix: str = "",
454 color_tag: str | None = None,
455 ):
456 if not self.isuffix or self.isuffix[0] in string.whitespace:
457 # Only add `rsuffix` if we're at the end of an array element.
458 # Don't add `rsuffix` if we're in the middle of an array, unless the array
459 # is separated by spaces.
460 rsuffix = self.rsuffix
461 rsymbols = self.rsymbols
462 else:
463 rsuffix = ""
464 rsymbols = ""
466 if self._group_sorted:
467 group_id = (self._group_id, 0)
468 else:
469 group_id = (self._group_id, len(self._completions))
471 if color_tag is None:
472 color_tag = self._group_color_tag
474 self._completions.append(
475 Completion(
476 iprefix=self.iprefix,
477 completion=completion,
478 rsuffix=rsuffix,
479 rsymbols=rsymbols,
480 isuffix=self.isuffix,
481 comment=comment,
482 dprefix=dprefix,
483 dsuffix=dsuffix,
484 group_id=group_id,
485 group_color_tag=color_tag,
486 )
487 )
489 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None):
490 """
491 Add a new completions group.
493 All completions added after call to this method will be placed to the new group.
494 They will be grouped together, and colored according to the group's color tag.
496 :param sorted:
497 controls whether completions in the new group
498 should be sorted lexicographically.
499 :param color_tag:
500 which color tag should be used to display completions
501 and their help messages for this group.
503 See :attr:`yuio.widget.Option.color_tag` for details.
505 """
507 self._group_id += 1
508 self._group_sorted = sorted
509 self._group_color_tag = color_tag
511 @property
512 def num_completions(self) -> int:
513 """
514 Number of completions added so far.
516 """
518 return len(self._completions)
520 def split_off_prefix(self, delim: str | None = None, /):
521 """
522 Move everything up to the last occurrence of `delim`
523 from :attr:`~CompletionCollector.prefix`
524 to :attr:`~CompletionCollector.iprefix`.
526 :param delim:
527 delimiter to split off; :data:`None` value splits off on any whitespace
528 character, similar to :meth:`str.rsplit`.
530 """
532 delim = delim or " "
533 parts = self.prefix.rsplit(delim, maxsplit=1)
534 if len(parts) > 1:
535 self.iprefix += parts[0] + delim
536 self.prefix = parts[1]
538 def split_off_suffix(self, delim: str | None = None, /):
539 """
540 Move everything past the first occurrence of `delim`
541 from :attr:`~CompletionCollector.suffix`
542 to :attr:`~CompletionCollector.isuffix`.
544 :param delim:
545 delimiter to split off; :data:`None` value splits off on any whitespace
546 character, similar to :meth:`str.split`.
548 """
550 delim = delim or " "
551 parts = self.suffix.split(delim, maxsplit=1)
552 if len(parts) > 1:
553 self.suffix = parts[0]
554 self.isuffix = delim + parts[1] + self.isuffix
556 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]:
557 """
558 Finish collecting completions and return everything that was collected.
560 Do not reuse a collector after it was finalized.
562 :returns:
563 list of completions, sorted by their groups and preferred ordering
564 within each group.
566 If all completions start with a common prefix, a single completion
567 is returned containing this prefix.
569 """
571 if len(self._completions) > 1:
572 c0 = self._completions[0]
574 iprefix = c0.iprefix
575 isuffix = c0.isuffix
577 if (
578 derive_common_prefix
579 and self.full_prefix.startswith(iprefix)
580 and self.full_suffix.endswith(isuffix)
581 and all(
582 c.iprefix == iprefix and c.isuffix == isuffix
583 for c in self._completions
584 )
585 ):
586 # If all completions have the same `iprefix` and `isuffix`...
587 common_prefix = _commonprefix(
588 list(c.completion for c in self._completions)
589 )
590 if common_prefix and len(iprefix) + len(common_prefix) > len(
591 self.iprefix
592 ) + len(self.prefix):
593 # ...and they have a common prefix that is longer than what's entered so far,
594 # then complete this common prefix.
595 rsuffix = ""
596 rsymbols = ""
597 if all(
598 common_prefix == c.completion and rsuffix == c.rsuffix
599 for c in self._completions
600 ):
601 # If completing common prefix actually fulfills a completion, add `rsuffix` as well.
602 rsuffix = c0.rsuffix
603 rsymbols = c0.rsymbols
604 return [
605 Completion(
606 iprefix=iprefix,
607 completion=common_prefix,
608 rsuffix=rsuffix,
609 rsymbols=rsymbols,
610 isuffix=isuffix,
611 comment=None,
612 dprefix="",
613 dsuffix="",
614 group_id=(0, 0),
615 group_color_tag=None,
616 )
617 ]
619 self._completions.sort()
620 return self._completions
623_MAX_COMPLETION_CORRECTIONS: int = 1
624_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3
627@_t.final
628class _CorrectingCollector(CompletionCollector):
629 def __init__(self, text: str, pos: int):
630 super().__init__(text, pos)
632 self._has_corrections = False
634 def add(
635 self,
636 completion: str,
637 /,
638 *,
639 comment: str | None = None,
640 dprefix: str = "",
641 dsuffix: str = "",
642 color_tag: str | None = None,
643 ):
644 if not completion or completion in self.dedup_words:
645 return
647 a = self.prefix + self.suffix
648 b = completion
649 corrections = _corrections(a, b)
650 threshold = (
651 _MAX_COMPLETION_CORRECTIONS
652 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2
653 )
655 if corrections <= 1:
656 # this is a simple mistype, add it as usual
657 self._add(
658 completion,
659 comment=comment,
660 dprefix=dprefix,
661 dsuffix=dsuffix,
662 color_tag=color_tag,
663 )
664 elif corrections <= threshold:
665 # this is a correction, add it into corrections group
666 if comment:
667 comment = "corrected: " + comment
668 else:
669 comment = "corrected"
670 with self.save_state():
671 self._group_id = 0xFFFFFFFE # (big enough) - 1
672 self._group_color_tag = "corrected"
673 self._add(
674 completion,
675 comment=comment,
676 dprefix=dprefix,
677 dsuffix=dsuffix,
678 color_tag=color_tag,
679 )
680 self._has_corrections = True
682 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]:
683 if self._has_corrections:
684 c0 = self._completions[0]
686 iprefix = ""
687 prefix = self.full_prefix
688 suffix = self.full_suffix
689 isuffix = ""
691 if prefix.startswith(c0.iprefix):
692 l = len(c0.iprefix)
693 iprefix = prefix[:l]
694 prefix = prefix[l:]
696 if suffix.endswith(c0.isuffix):
697 l = len(c0.isuffix)
698 isuffix = suffix[-l:]
699 suffix = suffix[:-l]
701 # If we have corrections, add original value to the end.
702 with self.save_state():
703 self._group_id = 0xFFFFFFFF # (big enough)
704 self._group_color_tag = "original"
705 self.iprefix = iprefix
706 self.isuffix = isuffix
707 self._add(prefix + suffix, comment="original")
709 self._completions.sort()
710 return self._completions
713def _corrections(a: str, b: str) -> float:
714 # Damerau–Levenshtein distance (Optimal String Alignment distance)
716 a = a.casefold()
717 b = b.casefold()
718 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)]
719 for i in range(len(a) + 1):
720 d[i][0] = i
721 for j in range(len(b) + 1):
722 d[0][j] = j
723 for i in range(1, len(a) + 1):
724 for j in range(1, len(b) + 1):
725 d[i][j] = min(
726 # Add to `a`:
727 d[i - 1][j] + 1,
728 # Add to `b`:
729 d[i][j - 1] + 1,
730 # Replace:
731 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]),
732 # Transpose:
733 (
734 d[i - 2][j - 2] + (a[i - 1] != b[j - 1])
735 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1]
736 else math.inf
737 ),
738 )
740 return d[-1][-1]
743def _commonprefix(m: list[str]) -> str:
744 if not m:
745 return ""
746 s1 = min(m)
747 s2 = max(m)
748 for i, c in enumerate(s1):
749 if c != s2[i]:
750 return s1[:i]
751 return s1
754class Completer(abc.ABC):
755 """
756 An interface for text completion providers.
758 """
760 def complete(
761 self,
762 text: str,
763 pos: int,
764 /,
765 *,
766 do_corrections: bool = True,
767 derive_common_prefix: bool = True,
768 ) -> list[Completion]:
769 """
770 Complete the given text at the given cursor position.
772 :param text:
773 text that is being completed.
774 :param pos:
775 position of the cursor in the text. ``0`` means the cursor
776 is before the first character, ``len(text)`` means the cursor
777 is after the last character.
778 :param do_corrections:
779 if :data:`True` (default), completion system will try to guess
780 if there are any misspells in the `text`, and offer to correct them.
781 :param derive_common_prefix:
782 if :data:`True` (default), and all returned completions have a non-empty
783 common prefix, return a single completion with this prefix instead.
784 :returns:
785 a sorted list of completions.
787 If all completions start with a common prefix, a single completion
788 is returned containing this prefix.
790 """
792 collector = CompletionCollector(text, pos)
793 with collector.save_state():
794 self._process(collector)
795 completions = collector.finalize()
796 if completions or not do_corrections:
797 return completions
799 collector = _CorrectingCollector(text, pos)
800 with collector.save_state():
801 self._process(collector)
802 return collector.finalize(derive_common_prefix=derive_common_prefix)
804 @abc.abstractmethod
805 def _process(self, collector: CompletionCollector, /):
806 """
807 Generate completions and add them to the given collector.
809 Implementing this class is straight forward, just feed all possible
810 completions to the collector. For example, let's implement a completer
811 for environment variables:
813 .. code-block:: python
815 class EnvVarCompleter(Completer):
816 def _process(self, collector: CompletionCollector):
817 for var in os.environ.keys():
818 collector.add(var)
820 """
822 raise NotImplementedError()
824 def _get_completion_model(
825 self, *, is_many: bool = False
826 ) -> _OptionSerializer.Model:
827 """
828 Internal, do not use.
830 """
832 return _OptionSerializer.CustomCompleter(self)
835class Empty(Completer):
836 """
837 An empty completer that returns no values.
839 """
841 def _process(self, collector: CompletionCollector):
842 pass # nothing to do
844 def _get_completion_model(
845 self, *, is_many: bool = False
846 ) -> _OptionSerializer.Model:
847 return _OptionSerializer.Model()
850@dataclass(frozen=True, slots=True)
851class Option:
852 """
853 A single completion option for the :class:`Choice` completer.
855 """
857 completion: str
858 """
859 This string will replace an element that is being completed.
861 """
863 comment: str | None = None
864 """
865 Short comment displayed alongside the completion.
867 """
870class Choice(Completer):
871 """
872 Completes input from a predefined list of completions.
874 :param choices:
875 options to choose completion from.
877 """
879 def __init__(self, choices: _t.Collection[Option], /):
880 self._choices: _t.Collection[Option] = choices
882 def _process(self, collector: CompletionCollector, /):
883 for choice in self._choices:
884 collector.add(choice.completion, comment=choice.comment)
886 def _get_completion_model(
887 self, *, is_many: bool = False
888 ) -> _OptionSerializer.Model:
889 if any(option.comment for option in self._choices):
890 return _OptionSerializer.ChoiceWithDesc(
891 [(option.completion, option.comment or "") for option in self._choices]
892 )
893 else:
894 return _OptionSerializer.Choice(
895 [option.completion for option in self._choices]
896 )
899class Alternative(Completer):
900 """
901 Joins outputs from multiple completers.
903 :param completers:
904 list of inner completers.
906 This is a list of tuples. First tuple element is a description of a completion
907 group. It will be displayed when this completer is used in shells
908 that support it (namely, ZSH). Second tuple element is the inner completer
909 itself.
911 """
913 def __init__(self, completers: list[tuple[str, Completer]], /):
914 self._completers = completers
916 def _process(self, collector: CompletionCollector, /):
917 for _, completer in self._completers:
918 with collector.save_state():
919 collector.add_group()
920 completer._process(collector)
922 def _get_completion_model(
923 self, *, is_many: bool = False
924 ) -> _OptionSerializer.Model:
925 return _OptionSerializer.Alternative(
926 [
927 (name, completer._get_completion_model(is_many=is_many))
928 for name, completer in self._completers
929 ]
930 )
933class List(Completer):
934 """
935 Completes a value-separated list of elements.
937 :param inner:
938 completer for list items.
939 :param delimiter:
940 a character that separates list items. :data:`None` separates by any whitespace
941 character, similar to :meth:`str.split`.
942 :param allow_duplicates:
943 whether to show completions that already appear in the list.
945 """
947 def __init__(
948 self,
949 inner: Completer,
950 /,
951 *,
952 delimiter: str | None = None,
953 allow_duplicates: bool = False,
954 ):
955 self._inner = inner
956 if delimiter == "":
957 raise ValueError("empty delimiter")
958 self._delimiter = delimiter
959 self._allow_duplicates = allow_duplicates
961 def _process(self, collector: CompletionCollector, /):
962 collector.split_off_prefix(self._delimiter)
963 collector.split_off_suffix(self._delimiter)
964 collector.rsuffix = self._delimiter or " "
965 collector.rsymbols += self._delimiter or string.whitespace
967 if not self._allow_duplicates:
968 dedup_words = set(
969 collector.iprefix.split(self._delimiter)
970 + collector.isuffix.split(self._delimiter)
971 )
972 if collector.text in dedup_words:
973 dedup_words.remove(collector.text)
974 collector.dedup_words = frozenset(dedup_words)
975 else:
976 collector.dedup_words = frozenset()
978 self._inner._process(collector)
980 def _get_completion_model(
981 self, *, is_many: bool = False
982 ) -> _OptionSerializer.Model:
983 if is_many:
984 return _OptionSerializer.ListMany(
985 self._delimiter or " ", self._inner._get_completion_model()
986 )
987 else:
988 return _OptionSerializer.List(
989 self._delimiter or " ", self._inner._get_completion_model()
990 )
993class Tuple(Completer):
994 """
995 Completes a value-separated tuple of elements.
997 :param inner:
998 completers for each tuple element.
999 :param delimiter:
1000 a character that separates list items. :data:`None` separates by any whitespace
1001 character, similar to :meth:`str.split`.
1003 """
1005 def __init__(self, *inner: Completer, delimiter: str | None = None):
1006 self._inner = inner
1007 if delimiter == "":
1008 raise ValueError("empty delimiter")
1009 self._delimiter = delimiter
1011 def _process(self, collector: CompletionCollector, /):
1012 pos = len(collector.prefix.split(self._delimiter))
1013 if (
1014 pos
1015 and self._delimiter is None
1016 and collector.prefix
1017 and collector.prefix[-1] in string.whitespace
1018 ):
1019 # `.split(None)` will trim whitespaces at the end.
1020 # Make sure we count those towards the current position in the tuple.
1021 pos += 1
1022 if pos > len(self._inner):
1023 return
1024 if pos > 0:
1025 pos -= 1
1027 collector.split_off_prefix(self._delimiter)
1028 collector.split_off_suffix(self._delimiter)
1029 collector.rsuffix = self._delimiter or " "
1030 collector.rsymbols += self._delimiter or string.whitespace
1032 self._inner[pos]._process(collector)
1034 def _get_completion_model(
1035 self, *, is_many: bool = False
1036 ) -> _OptionSerializer.Model:
1037 if is_many:
1038 return _OptionSerializer.TupleMany(
1039 self._delimiter or " ",
1040 [inner._get_completion_model() for inner in self._inner],
1041 )
1042 else:
1043 return _OptionSerializer.Tuple(
1044 self._delimiter or " ",
1045 [inner._get_completion_model() for inner in self._inner],
1046 )
1049class File(Completer):
1050 """
1051 Completes file paths.
1053 :param extensions:
1054 allowed file extensions, should include the leading dot.
1056 """
1058 def __init__(self, extensions: str | _t.Collection[str] | None = None):
1059 if isinstance(extensions, str):
1060 self._extensions = [extensions]
1061 elif extensions is not None:
1062 self._extensions = list(extensions)
1063 else:
1064 self._extensions = None
1066 def _process(self, collector: CompletionCollector, /):
1067 base, name = os.path.split(collector.prefix)
1068 if base and not base.endswith(os.path.sep):
1069 base += os.path.sep
1070 collector.iprefix += base
1071 collector.prefix = name
1072 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0]
1073 resolved = pathlib.Path(base).expanduser().resolve()
1074 rsuffix = collector.rsuffix
1075 if resolved.is_dir():
1076 if name.startswith("."):
1077 collector.rsuffix = ""
1078 collector.add(os.path.curdir + os.path.sep, color_tag="dir")
1079 collector.add(os.path.pardir + os.path.sep, color_tag="dir")
1080 if name.startswith("~"):
1081 collector.rsuffix = ""
1082 collector.add("~" + os.path.sep, color_tag="dir")
1083 try:
1084 for path in resolved.iterdir():
1085 if path.is_dir():
1086 if path.is_symlink():
1087 color_tag = "symlink"
1088 dsuffix = "@"
1089 else:
1090 color_tag = "dir"
1091 dsuffix = ""
1092 collector.rsuffix = ""
1093 collector.add(
1094 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix
1095 )
1096 elif self._extensions is None or any(
1097 path.name.endswith(ext) for ext in self._extensions
1098 ):
1099 collector.rsuffix = rsuffix
1100 color_tag = None
1101 dsuffix = ""
1102 if path.is_symlink():
1103 color_tag = "symlink"
1104 dsuffix = "@"
1105 elif path.is_file():
1106 if (os.name != "nt" and os.access(path, os.X_OK)) or (
1107 os.name == "nt" and path.suffix == ".exe"
1108 ):
1109 color_tag = "exec"
1110 dsuffix = "*"
1111 else:
1112 color_tag = "file"
1113 elif path.is_socket():
1114 color_tag = "socket"
1115 dsuffix = "="
1116 elif path.is_fifo():
1117 color_tag = "pipe"
1118 dsuffix = "|"
1119 elif path.is_block_device():
1120 color_tag = "block_device"
1121 dsuffix = "#"
1122 elif path.is_char_device():
1123 color_tag = "char_device"
1124 dsuffix = "%"
1125 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix)
1126 except PermissionError:
1127 return
1129 def _get_completion_model(
1130 self, *, is_many: bool = False
1131 ) -> _OptionSerializer.Model:
1132 return _OptionSerializer.File(
1133 "|".join(extension.lstrip(".") for extension in self._extensions or [])
1134 )
1137class Dir(File):
1138 """
1139 Completes directories.
1141 """
1143 def __init__(self):
1144 super().__init__([])
1146 def _get_completion_model(
1147 self, *, is_many: bool = False
1148 ) -> _OptionSerializer.Model:
1149 return _OptionSerializer.Dir()
1152class _CustomCompleterRegistrar:
1153 def __init__(self) -> None:
1154 self._custom_completer_index = 0
1156 def _register_custom_completer(self) -> int:
1157 index = self._custom_completer_index
1158 self._custom_completer_index += 1
1159 return index
1162class _OptionSerializer(_CustomCompleterRegistrar):
1163 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ")
1165 def __init__(
1166 self,
1167 flags: list[str],
1168 path: str,
1169 nargs: str | int,
1170 metavar: str | tuple[str, ...],
1171 help: str | yuio.Disabled,
1172 ):
1173 super().__init__()
1175 self._flags = flags
1176 self._path = path
1177 self._nargs = nargs
1178 self._metavar = metavar if isinstance(metavar, tuple) else (metavar,)
1179 self._help: str | yuio.Disabled = help
1181 def dump(self, model: _OptionSerializer.Model):
1182 if self._help is yuio.DISABLED:
1183 desc = "__yuio_hide__"
1184 else:
1185 desc = self._process_help(self._help)
1187 compspec = [
1188 self._path,
1189 " ".join(self._flags),
1190 desc,
1191 " ".join(
1192 re.sub(
1193 r"[\\ ]",
1194 lambda s: "\\S" if s.group() == " " else "\\L",
1195 str(m),
1196 )
1197 or ""
1198 for m in self._metavar
1199 ),
1200 str(self._nargs),
1201 *model.dump(self),
1202 ]
1204 return "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec)
1206 @staticmethod
1207 def _process_help(help: str):
1208 if (
1209 len(help) > 2
1210 and help[0].isupper()
1211 and (help[1].islower() or help[1].isspace())
1212 ):
1213 help = help[0].lower() + help[1:]
1214 if help.endswith(".") and not help.endswith(".."):
1215 help = help[:-1]
1216 if (index := help.find("\n\n")) != -1:
1217 help = help[:index]
1218 return yuio.string.strip_color_tags(help)
1220 @staticmethod
1221 def _dump_nested(compspec: _t.Iterable[object], s: _OptionSerializer) -> list[str]:
1222 contents = []
1224 for item in compspec:
1225 contents.extend(_OptionSerializer._dump_nested_item(item, s))
1227 return contents
1229 @staticmethod
1230 def _dump_nested_item(item: object, s: _OptionSerializer) -> list[str]:
1231 contents = []
1233 if isinstance(item, _OptionSerializer.Model):
1234 contents.extend(item.dump(s))
1235 elif isinstance(item, list):
1236 contents.append(str(len(item)))
1237 for sub_item in item:
1238 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s))
1239 elif isinstance(item, tuple):
1240 for sub_item in item:
1241 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s))
1242 else:
1243 contents.append(str(item))
1245 return contents
1247 @dataclass
1248 class Model:
1249 tag: typing.ClassVar[str] = "-"
1251 def __init_subclass__(cls, tag: str = "-", **kwargs):
1252 super().__init_subclass__(**kwargs)
1253 cls.tag = tag
1255 def dump(self, s: _OptionSerializer) -> list[str]:
1256 contents = _OptionSerializer._dump_nested(
1257 (getattr(self, field.name) for field in dataclasses.fields(self)), s
1258 )
1259 return [self.tag, str(len(contents)), *contents]
1261 def get_completer_at_index(
1262 self, s: _CustomCompleterRegistrar, index: int
1263 ) -> Completer | None:
1264 return None
1266 @dataclass
1267 class File(Model, tag="f"):
1268 ext: str
1270 @dataclass
1271 class Dir(Model, tag="d"):
1272 pass
1274 @dataclass
1275 class Choice(Model, tag="c"):
1276 choices: list[str]
1278 def dump(self, s: _OptionSerializer) -> list[str]:
1279 return [self.tag, str(len(self.choices)), *self.choices]
1281 @dataclass
1282 class ChoiceWithDesc(Model, tag="cd"):
1283 choices: list[tuple[str, str]]
1285 def dump(self, s: _OptionSerializer) -> list[str]:
1286 return [
1287 self.tag,
1288 str(len(self.choices) * 2),
1289 *[c[0] for c in self.choices],
1290 *[s._process_help(c[1]) for c in self.choices],
1291 ]
1293 @dataclass
1294 class Git(Model, tag="g"):
1295 modes: str
1297 @dataclass
1298 class List(Model, tag="l"):
1299 delim: str
1300 inner: _OptionSerializer.Model
1302 def get_completer_at_index(
1303 self, s: _CustomCompleterRegistrar, index: int
1304 ) -> Completer | None:
1305 return self.inner.get_completer_at_index(s, index)
1307 @dataclass
1308 class ListMany(List, tag="lm"):
1309 pass
1311 @dataclass
1312 class Tuple(Model, tag="t"):
1313 delim: str
1314 inner: list[_OptionSerializer.Model]
1316 def get_completer_at_index(
1317 self, s: _CustomCompleterRegistrar, index: int
1318 ) -> Completer | None:
1319 for inner in self.inner:
1320 if completer := inner.get_completer_at_index(s, index):
1321 return completer
1322 return None
1324 @dataclass
1325 class TupleMany(Tuple, tag="tm"):
1326 pass
1328 @dataclass
1329 class Alternative(Model, tag="a"):
1330 alternatives: list[tuple[str, _OptionSerializer.Model]]
1332 def get_completer_at_index(
1333 self, s: _CustomCompleterRegistrar, index: int
1334 ) -> Completer | None:
1335 for _, inner in self.alternatives:
1336 if completer := inner.get_completer_at_index(s, index):
1337 return completer
1338 return None
1340 @dataclass
1341 class CustomCompleter(Model, tag="cc"):
1342 completer: Completer
1344 def dump(self, s: _OptionSerializer) -> list[str]:
1345 return [
1346 self.tag,
1347 "1",
1348 json.dumps(
1349 {
1350 "path": s._path,
1351 "flags": s._flags,
1352 "index": s._register_custom_completer(),
1353 }
1354 ),
1355 ]
1357 def get_completer_at_index(
1358 self, s: _CustomCompleterRegistrar, index: int
1359 ) -> Completer | None:
1360 this_index = s._register_custom_completer()
1361 if index == this_index:
1362 return self.completer
1363 else:
1364 return None
1367class _ProgramSerializer:
1368 def __init__(self, path: str = "") -> None:
1369 self._path = path
1370 self._lines: list[str] = []
1371 self._positionals = 0
1372 self._subcommands: dict[
1373 str, tuple[_ProgramSerializer, bool, str | yuio.Disabled]
1374 ] = {}
1376 def add_option(
1377 self,
1378 flags: list[str] | yuio.Positional,
1379 nargs: str | int,
1380 metavar: str | tuple[str, ...],
1381 help: str | yuio.Disabled,
1382 completer: Completer | None,
1383 is_many: bool,
1384 ):
1385 if flags is yuio.POSITIONAL:
1386 flags = [str(self._positionals)]
1387 self._positionals += 1
1388 if completer is None:
1389 model = _OptionSerializer.Model()
1390 else:
1391 model = completer._get_completion_model(is_many=is_many)
1392 self._add_option(flags, nargs, metavar, help, model)
1394 def _add_option(
1395 self,
1396 flags: list[str],
1397 nargs: str | int,
1398 metavar: str | tuple[str, ...],
1399 help: str | yuio.Disabled,
1400 model: _OptionSerializer.Model,
1401 ):
1402 self._lines.append(
1403 _OptionSerializer(flags, self._path, nargs, metavar, help).dump(model)
1404 )
1406 def add_subcommand(
1407 self,
1408 name: str,
1409 is_alias: bool,
1410 help: str | yuio.Disabled,
1411 ):
1412 serializer = _ProgramSerializer(f"{self._path}/{name}")
1413 self._subcommands[name] = (serializer, is_alias, help)
1414 return serializer
1416 def _dump(self):
1417 if self._subcommands:
1418 self._add_option(
1419 ["c"],
1420 1,
1421 "<subcommand>",
1422 "Subcommand.",
1423 _OptionSerializer.ChoiceWithDesc(
1424 [
1425 (name, help)
1426 for name, (_, is_alias, help) in self._subcommands.items()
1427 if not is_alias and help is not yuio.DISABLED
1428 ]
1429 ),
1430 )
1432 for _, (serializer, _, _) in self._subcommands.items():
1433 self._lines.extend(serializer._dump())
1435 return self._lines
1437 def dump(self):
1438 return "\n".join(self._dump())
1441_PROG_ESCAPE = str.maketrans(
1442 string.punctuation + string.whitespace,
1443 "_" * (len(string.punctuation) + len(string.whitespace)),
1444)
1447def _run_completer_at_index(completer: Completer, is_many: bool, index: int, word: str):
1448 registrar = _CustomCompleterRegistrar()
1449 model = completer._get_completion_model(is_many=is_many)
1450 completer_at_index = model.get_completer_at_index(registrar, index)
1451 if completer_at_index:
1452 # It's up to user's shell to do corrections and derive common prefix.
1453 completions = completer.complete(
1454 word, len(word), do_corrections=False, derive_common_prefix=False
1455 )
1456 for completion in completions:
1457 print(
1458 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}",
1459 file=sys.__stdout__,
1460 )
1463def _write_completions(compdata: str, prog: str | None = None, shell: str = "all"):
1464 import yuio.io
1466 true_prog = prog or pathlib.Path(sys.argv[0]).stem
1467 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE)
1469 if pathlib.Path(sys.argv[0]).stem == "__main__":
1470 yuio.io.failure(
1471 "You've invoked this program as a python module, most likely with "
1472 "`python -m <module>`. For completions to work, the program "
1473 "must be invoked as a command in your `$PATH`"
1474 )
1475 sys.exit(1)
1476 if not prog:
1477 yuio.io.failure("Failed to generate completion because program name is empty")
1478 sys.exit(1)
1479 if not re.match(r"^[a-zA-Z0-9_-]+$", prog):
1480 yuio.io.failure(
1481 "Failed to generate completion due to "
1482 "forbidden characters in program name: `%r`",
1483 prog,
1484 )
1485 sys.exit(1)
1487 if shell == "uninstall":
1488 shell = "all"
1489 yuio.io.heading("Uninstalling completions for `%s`", true_prog)
1490 install = False
1491 else:
1492 yuio.io.heading("Generating completions for `%s`", true_prog)
1493 install = True
1495 if not shutil.which(true_prog):
1496 yuio.io.warning(
1497 "Program `%s` is not in your `$PATH`. Completions might not be able "
1498 "to initialize",
1499 true_prog,
1500 )
1502 if os.name == "nt":
1503 data_home = cache_home = config_home = pathlib.Path(
1504 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local")
1505 )
1506 else:
1507 data_home = pathlib.Path(
1508 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share")
1509 )
1510 cache_home = pathlib.Path(
1511 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache")
1512 )
1513 config_home = pathlib.Path(
1514 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config")
1515 )
1517 compdata_path = data_home / f"yuio/{prog}.compdata.tsv"
1519 if install:
1520 task_heading = "Installing completions"
1521 else:
1522 task_heading = "Uninstalling completions"
1524 with yuio.io.Task(task_heading) as t:
1525 if install:
1526 os.makedirs(data_home / "yuio", exist_ok=True)
1527 compdata_path.write_text(compdata)
1528 yuio.io.info(
1529 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path
1530 )
1531 elif compdata_path.exists():
1532 os.remove(compdata_path)
1533 yuio.io.info("Removed <c path>%s</c>", compdata_path)
1535 if shell in ["all", "bash"]:
1536 t.comment("Bash")
1537 _write_bash_script(
1538 prog,
1539 true_prog,
1540 install,
1541 compdata_path,
1542 data_home,
1543 cache_home,
1544 config_home,
1545 )
1546 if shell in ["all", "zsh"]:
1547 t.comment("Zsh")
1548 _write_zsh_script(
1549 prog,
1550 true_prog,
1551 install,
1552 compdata_path,
1553 data_home,
1554 cache_home,
1555 config_home,
1556 )
1557 if shell in ["all", "fish"]:
1558 t.comment("Fish")
1559 _write_fish_script(
1560 prog,
1561 true_prog,
1562 install,
1563 compdata_path,
1564 data_home,
1565 cache_home,
1566 config_home,
1567 )
1568 if shell in ["all", "pwsh"]:
1569 t.comment("PowerShell")
1570 _write_pwsh_script(
1571 prog,
1572 true_prog,
1573 install,
1574 compdata_path,
1575 data_home,
1576 cache_home,
1577 config_home,
1578 )
1580 yuio.io.success("All done! Please restart your shell for changes to take effect.")
1581 if install:
1582 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog)
1585def _write_bash_script(
1586 prog: str,
1587 true_prog: str,
1588 install: bool,
1589 compdata_path: pathlib.Path,
1590 data_home: pathlib.Path,
1591 cache_home: pathlib.Path,
1592 config_home: pathlib.Path,
1593):
1594 import yuio.exec
1595 import yuio.io
1597 if os.name == "nt":
1598 yuio.io.warning(
1599 "Skipped <c note>Bash</c>: completion script doesn't support windows"
1600 )
1601 return
1603 if install and not shutil.which("bash"):
1604 yuio.io.warning("Skipped <c note>Bash</c>: `bash` command is not available")
1605 return
1607 try:
1608 bash_completions_home = yuio.exec.exec(
1609 "bash",
1610 "-lc",
1611 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"',
1612 ).splitlines()[-1]
1613 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1614 bash_completions_home = data_home / "bash-completion/completions/"
1615 bash_completions_home = pathlib.Path(bash_completions_home)
1616 script_dest = bash_completions_home / true_prog
1618 if install:
1619 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path)
1620 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest)
1621 elif script_dest.exists():
1622 os.remove(script_dest)
1623 yuio.io.info("Removed <c path>%s</c>", script_dest)
1626def _write_zsh_script(
1627 prog: str,
1628 true_prog: str,
1629 install: bool,
1630 compdata_path: pathlib.Path,
1631 data_home: pathlib.Path,
1632 cache_home: pathlib.Path,
1633 config_home: pathlib.Path,
1634):
1635 import yuio.exec
1636 import yuio.io
1638 if os.name == "nt":
1639 yuio.io.warning(
1640 "Skipped <c note>Zsh</c>: completion script doesn't support windows"
1641 )
1642 return
1644 if install and not shutil.which("zsh"):
1645 yuio.io.warning("Skipped <c note>Zsh</c>: `zsh` command is not available")
1646 return
1648 needs_cache_cleanup = False
1650 zsh_completions_home = data_home / "zsh/completions"
1652 if not zsh_completions_home.exists():
1653 zsh_completions_home.mkdir(parents=True)
1654 # Completions home needs rwxr-xr-x, otherwise zsh will not load
1655 # our completion scripts.
1656 zsh_completions_home.chmod(mode=0o755)
1658 script_dest = zsh_completions_home / ("_" + true_prog)
1660 if install:
1661 needs_cache_cleanup = True
1662 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path)
1663 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest)
1664 elif script_dest.exists():
1665 needs_cache_cleanup = True
1667 os.remove(script_dest)
1668 yuio.io.info("Removed <c path>%s</c>", script_dest)
1670 try:
1671 fpath = (
1672 yuio.exec.exec(
1673 "zsh",
1674 "-lc",
1675 "echo -n $FPATH",
1676 )
1677 .splitlines()[-1]
1678 .split(":")
1679 )
1680 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1681 fpath = []
1683 try:
1684 zhome = yuio.exec.exec(
1685 "zsh",
1686 "-lc",
1687 "echo -n ${ZDOTDIR:-$HOME}",
1688 ).splitlines()[-1]
1689 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1690 zhome = pathlib.Path.home()
1692 zhome = pathlib.Path(zhome)
1693 zprofile_path = zhome / ".zprofile"
1694 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n"
1696 if install:
1697 if str(zsh_completions_home) not in fpath:
1698 with open(zprofile_path, "a") as f:
1699 f.write(zprofile_append_text)
1700 yuio.io.info(
1701 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`",
1702 zprofile_path,
1703 zsh_completions_home,
1704 )
1705 elif zprofile_path.exists():
1706 zprofile_text = zprofile_path.read_text()
1707 if zprofile_append_text in zprofile_text:
1708 yuio.io.info(
1709 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1710 " because other completions might rely on them",
1711 zprofile_path,
1712 )
1714 if not needs_cache_cleanup:
1715 return
1717 # Try to remove completions cache from the most common places.
1718 for zcomp_basedir in [zhome, cache_home / "prezto"]:
1719 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir():
1720 continue
1721 for file in zcomp_basedir.iterdir():
1722 if file.is_file() and re.match(r"^\.?zcompdump", file.name):
1723 os.remove(file)
1724 yuio.io.info(
1725 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>",
1726 file,
1727 )
1729 try:
1730 # Run zsh with the right flags in case zshrc runs compinit.
1731 # If after generating completions user runs `zsh` without the `-l` flag,
1732 # our changes to fpath will not be visible, and compinit will dump
1733 # an invalid version of cache. To avoid this, we call zsh ourselves
1734 # before the user has a chance to do it. Notice, though, that we don't
1735 # run `compdump`. This is because we can't be sure that the user uses
1736 # the default cache path (~/.zcompdump).
1737 yuio.exec.exec("zsh", "-lc", "true")
1738 except (subprocess.CalledProcessError, FileNotFoundError):
1739 pass
1742def _write_fish_script(
1743 prog: str,
1744 true_prog: str,
1745 install: bool,
1746 compdata_path: pathlib.Path,
1747 data_home: pathlib.Path,
1748 cache_home: pathlib.Path,
1749 config_home: pathlib.Path,
1750):
1751 import yuio.io
1753 if os.name == "nt":
1754 yuio.io.warning(
1755 "Skipped <c note>Fish</c>: completion script doesn't support windows"
1756 )
1757 return
1759 if install and not shutil.which("fish"):
1760 yuio.io.warning("Skipped <c note>Fish</c>: `fish` command is not available")
1761 return
1763 fish_completions_home = data_home / "fish/vendor_completions.d"
1764 script_dest = fish_completions_home / (true_prog + ".fish")
1766 if install:
1767 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path)
1768 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest)
1769 elif script_dest.exists():
1770 os.remove(script_dest)
1771 yuio.io.info("Removed <c path>%s</c>", script_dest)
1774def _write_pwsh_script(
1775 prog: str,
1776 true_prog: str,
1777 install: bool,
1778 compdata_path: pathlib.Path,
1779 data_home: pathlib.Path,
1780 cache_home: pathlib.Path,
1781 config_home: pathlib.Path,
1782):
1783 import yuio.exec
1784 import yuio.io
1786 if shutil.which("pwsh"):
1787 command = "pwsh"
1788 elif shutil.which("powershell"):
1789 command = "powershell"
1790 else:
1791 yuio.io.warning(
1792 "Skipped <c note>PowerShell</c>: `pwsh` command is not available"
1793 )
1794 return
1796 try:
1797 pwsh_data = (
1798 yuio.exec.exec(
1799 command,
1800 "-Command",
1801 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"',
1802 )
1803 .splitlines()[-1]
1804 .strip()
1805 )
1806 except (subprocess.CalledProcessError, IndexError) as e:
1807 yuio.io.warning(
1808 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s",
1809 e,
1810 )
1811 return
1812 except FileNotFoundError:
1813 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found")
1814 return
1815 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data):
1816 version = match.group(1)
1817 profile_s = match.group(2)
1818 else:
1819 yuio.io.warning(
1820 "Skipped <c note>PowerShell</c>: can't determine powershell version"
1821 )
1822 return
1823 if not profile_s:
1824 yuio.io.warning(
1825 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty"
1826 )
1827 return
1828 if tuple(int(v) for v in version.split(".")) < (5, 0, 0):
1829 yuio.io.warning(
1830 "Skipped <c note>PowerShell</c>: completions script requires "
1831 "PowerShell 5 or newer, you have %s",
1832 version,
1833 )
1834 return
1836 profile_path = pathlib.Path(profile_s).expanduser().resolve()
1837 profile_path.parent.mkdir(exist_ok=True, parents=True)
1839 data_dir = data_home / "yuio/pwsh"
1840 loader_path = data_dir / "LoadCompletions.ps1"
1841 script_dest = data_dir / f"_{true_prog}.ps1"
1842 if install:
1843 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path)
1844 yuio.io.info(
1845 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest
1846 )
1847 _write_pwsh_loader(loader_path, data_dir)
1848 elif script_dest.exists():
1849 os.remove(script_dest)
1850 yuio.io.info("Removed <c path>%s</c>", script_dest)
1852 try:
1853 data_dirs = [
1854 pathlib.Path(f).expanduser().resolve()
1855 for f in yuio.exec.exec(
1856 command,
1857 "-Command",
1858 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")',
1859 )
1860 .strip()
1861 .splitlines()
1862 ]
1863 except (subprocess.CalledProcessError, FileNotFoundError):
1864 return
1866 pwsh_profile_append_text = f"\n. {loader_path}\n"
1868 if install:
1869 if data_dir not in data_dirs:
1870 with open(profile_path, "a") as f:
1871 f.write(pwsh_profile_append_text)
1872 yuio.io.info(
1873 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup",
1874 profile_path,
1875 loader_path,
1876 )
1877 elif profile_path.exists():
1878 pwsh_profile_text = profile_path.read_text()
1879 if pwsh_profile_append_text in pwsh_profile_text:
1880 yuio.io.info(
1881 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1882 " because other completions might rely on them",
1883 profile_path,
1884 )
1887def _write_script(
1888 path: pathlib.Path,
1889 script_name: str,
1890 prog: str,
1891 true_prog: str,
1892 compdata_path: pathlib.Path,
1893):
1894 script_template = _read_script(script_name)
1895 script = (
1896 (script_template)
1897 .replace("@prog@", prog)
1898 .replace("@true_prog@", true_prog)
1899 .replace("@data@", str(compdata_path))
1900 .replace("@version@", yuio.__version__)
1901 )
1903 path.parent.mkdir(exist_ok=True, parents=True)
1904 path.write_text(script)
1905 path.chmod(0o755)
1908def _read_script(script_name: str):
1909 import zipfile
1910 import zipimport
1912 if isinstance(__loader__, zipimport.zipimporter):
1913 # Yuio is imported directly from a wheel.
1914 with zipfile.ZipFile(__loader__.archive) as archive:
1915 script_template = archive.read("yuio/_complete/" + script_name)
1916 return script_template.decode()
1917 else:
1918 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name
1919 return script_template_path.read_text()
1922def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path):
1923 import yuio.io
1925 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1"
1926 loader_template = loader_template_path.read_text()
1928 loader_version = re.search(
1929 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE
1930 )
1931 assert loader_version
1933 if loader_path.exists() and loader_path.is_file():
1934 current_loader = loader_path.read_text()
1935 current_version_s = re.search(
1936 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE
1937 )
1939 if current_version_s is None:
1940 yuio.io.warning(
1941 "<c note>Note:</c> can't determine version of <c path>%s</c>, "
1942 "file will be overridden",
1943 loader_path,
1944 )
1945 elif int(loader_version.group(1)) <= int(current_version_s.group(1)):
1946 return
1948 loader_template = loader_template.replace("@data@", str(data_dir))
1949 loader_path.write_text(loader_template)
1950 loader_path.chmod(0o755)
1951 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)