Coverage for yuio / complete.py: 78%
753 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:42 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:42 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9This module provides autocompletion functionality for widgets and CLI.
12Completer basics
13----------------
15All completers are derived from the :class:`Completer` base class
16with a simple interface:
18.. autoclass:: Completer
19 :members:
21.. autoclass:: Completion
22 :members:
25Completers
26----------
28Yuio provides basic completers that cover most of the cases:
30.. autoclass:: Empty
32.. autoclass:: Alternative
34.. autoclass:: Choice
36.. autoclass:: Option
37 :members:
39.. autoclass:: List
41.. autoclass:: Tuple
43.. autoclass:: File
45.. autoclass:: Dir
48Implementing your own completer
49-------------------------------
51To implement a custom completer, subclass :class:`Completer` and implement
52its :meth:`~Completer._process` method.
54.. note::
56 When using a custom completer for CLI flags in :mod:`yuio.app`,
57 completion script will invoke your program with special arguments
58 to run the completer and get its result.
60.. class:: Completer
61 :noindex:
63 .. automethod:: _process
65.. autoclass:: CompletionCollector
67"""
69from __future__ import annotations
71import abc
72import contextlib
73import dataclasses
74import functools
75import json
76import math
77import os
78import pathlib
79import re
80import shutil
81import string
82import subprocess
83import sys
84from dataclasses import dataclass
86import yuio
87import yuio.string
88from yuio.util import commonprefix as _commonprefix
90import typing
91import yuio._typing_ext as _tx
92from typing import TYPE_CHECKING
94if TYPE_CHECKING:
95 import typing_extensions as _t
96else:
97 from yuio import _typing as _t
99__all__ = [
100 "Alternative",
101 "Choice",
102 "Completer",
103 "Completion",
104 "CompletionCollector",
105 "Dir",
106 "Empty",
107 "File",
108 "List",
109 "Option",
110 "Tuple",
111]
114@dataclass(frozen=True, slots=True)
115@functools.total_ordering
116class Completion:
117 """
118 A single completion.
120 """
122 iprefix: str
123 """
124 See :class:`CompletionCollector.iprefix` for details.
126 """
128 completion: str
129 """
130 Text of the completion.
132 """
134 rsuffix: str
135 """
136 See :class:`CompletionCollector.rsuffix` for details.
138 """
140 rsymbols: str
141 """
142 See :class:`CompletionCollector.rsymbols` for details.
144 """
146 isuffix: str
147 """
148 See :class:`CompletionCollector.isuffix` for details.
150 """
152 comment: str | None
153 """
154 Short comment displayed alongside the completion.
156 """
158 dprefix: str
159 """
160 Prefix that will be displayed before :attr:`~Completion.completion`
161 when listing completions, but will not be inserted once completion
162 is applied.
164 """
166 dsuffix: str
167 """
168 Like :attr:`~Completion.dprefix`, but it's a suffix.
170 """
172 group_id: _tx.SupportsLt[_t.Any] = dataclasses.field(repr=False)
173 """
174 Group id, used to sort completions.
176 Actual content of this property is an implementation detail.
178 """
180 group_color_tag: str | None
181 """
182 Color tag that's used when displaying this completion.
184 See :meth:`CompletionCollector.add_group` for details.
186 """
188 def __lt__(self, other: Completion) -> bool:
189 """
190 Completions are ordered by their groups and then alphabetically.
192 """
194 return self.group_id < other.group_id or (
195 self.group_id == other.group_id and self.completion < other.completion
196 )
199@dataclass(init=False, eq=False, repr=False, match_args=False)
200class CompletionCollector:
201 """
202 A class that collects completions as completers are running.
204 The text that is being completed is split into four parts, similar
205 to what you might see in ZSH completion widgets. The main two are:
207 .. autoattribute:: prefix
209 .. autoattribute:: suffix
211 When completions are added to the collector, they are checked against
212 the current prefix to determine if they match the entered text. If they
213 do, the completion system will replace text from `prefix` and `suffix`
214 with the new completion string.
216 The two additional parts are:
218 .. autoattribute:: iprefix
220 .. autoattribute:: isuffix
222 For example, suppose you're completing a second element
223 of a colon-separated list. The list completer will set up
224 the collector so that `prefix` and `suffix` contain parts of the
225 current list element, while `iprefix` and `isuffix` contain
226 the rest of the elements:
228 .. code-block:: text
230 list_element_1:list_el|ement_2:list_element_3
231 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘
232 iprefix prefix │ suffix isuffix
233 └ cursor
235 Now, if the completer adds a completion ``"list_elements"``,
236 this text will replace the `prefix` and `suffix`, but not `iprefix`
237 and `isuffix`. So, after the completion is applied, the string will
238 look like so:
240 .. code-block:: text
242 list_element_1:list_elements:list_element_3
243 └┬──────────┘
244 this got replaced
246 Finally, there is `rsuffix`:
248 .. autoattribute:: rsuffix
250 .. autoattribute:: rsymbols
252 So, when completing a colon-separated list, colons will be added and removed
253 automatically, similar to how ZSH does it.
255 .. autoattribute:: dedup_words
257 .. autoattribute:: full_prefix
259 .. autoattribute:: full_suffix
261 .. autoattribute:: text
263 .. autoattribute:: num_completions
265 .. automethod:: add
267 .. automethod:: add_group
269 .. automethod:: save_state
271 .. automethod:: split_off_prefix
273 .. automethod:: split_off_suffix
275 .. automethod:: finalize
277 """
279 iprefix: str
280 """
281 Contains text that goes before the :attr:`~CompletionCollector.prefix`.
283 This prefix is not considered when checking whether a completion
284 matches a text, and it is not replaced by the completion. It will also
285 not be shown in the table of completions.
287 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix`
288 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into
289 list elements.
291 """
293 prefix: str
294 """
295 Portion of the completed text before the cursor.
297 """
299 suffix: str
300 """
301 Portion of the completed text after the cursor.
303 """
305 rsuffix: str
306 """
307 Starts empty, and may be set to hold a list separator.
309 This suffix will be added after the completion. However, it will be automatically
310 removed if the user types one of :attr:`CompletionCollector.rsymbols`,
311 or moves cursor, or alters input in some other way.
313 This property is mutable and can be changed by completers.
315 """
317 rsymbols: str
318 """
319 If user types one of the symbols from this string,
320 :attr:`~.CompletionCollector.rsuffix` will be removed.
322 This property is mutable and can be changed by completers.
324 """
326 isuffix: str
327 """
328 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes.
330 """
332 dedup_words: frozenset[str]
333 """
334 Completions from this set will not be added. This is useful
335 when completing lists of unique values.
337 This property is mutable and can be changed by completers.
339 """
341 # Internal fields.
342 _group_id: int
343 _group_sorted: bool
344 _group_color_tag: str | None
346 def __init__(self, text: str, pos: int, /):
347 self.iprefix = ""
348 self.prefix = text[:pos]
349 self.suffix = text[pos:]
350 self.rsuffix = ""
351 self.rsymbols = ""
352 self.isuffix = ""
353 self.dedup_words = frozenset()
355 self._group_id = 0
356 self._group_sorted = True
357 self._group_color_tag = None
359 self._completions: list[Completion] = []
361 @property
362 def full_prefix(self) -> str:
363 """
364 Portion of the final completed text that goes before the cursor.
366 """
368 return self.iprefix + self.prefix
370 @property
371 def full_suffix(self) -> str:
372 """
373 Portion of the final completed text that goes after the cursor.
375 """
377 return self.suffix + self.isuffix
379 @property
380 def text(self) -> str:
381 """
382 Portion of the text that is being autocompleted.
384 """
386 return self.prefix + self.suffix
388 @contextlib.contextmanager
389 def save_state(self):
390 """
391 Save current state of the collector, i.e. prefixes,
392 suffixes, etc., upon entering this context manager,
393 then restore state upon exiting.
395 Use this context manager when you need to call nested
396 completers more than once to prevent changes made in
397 one nested completer bleeding out into another
398 nested completer.
400 """
402 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)}
404 try:
405 yield
406 finally:
407 for name, value in state.items():
408 setattr(self, name, value)
410 def add(
411 self,
412 completion: str,
413 /,
414 *,
415 comment: str | None = None,
416 dprefix: str = "",
417 dsuffix: str = "",
418 color_tag: str | None = None,
419 ):
420 """
421 Add a new completion.
423 :param completion:
424 completed text without :attr:`~CompletionCollector.iprefix`
425 and :attr:`~CompletionCollector.isuffix`. This text will replace
426 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`.
427 :param comment:
428 additional comment that will be displayed near the completion.
429 :param color_tag:
430 allows overriding color tag from the group.
432 """
434 if (
435 completion
436 and completion not in self.dedup_words
437 and completion.startswith(self.prefix)
438 ):
439 self._add(
440 completion,
441 comment=comment,
442 dprefix=dprefix,
443 dsuffix=dsuffix,
444 color_tag=color_tag,
445 )
447 def _add(
448 self,
449 completion: str,
450 /,
451 *,
452 comment: str | None = None,
453 dprefix: str = "",
454 dsuffix: str = "",
455 color_tag: str | None = None,
456 ):
457 if not self.isuffix or self.isuffix[0] in string.whitespace:
458 # Only add `rsuffix` if we're at the end of an array element.
459 # Don't add `rsuffix` if we're in the middle of an array, unless the array
460 # is separated by spaces.
461 rsuffix = self.rsuffix
462 rsymbols = self.rsymbols
463 else:
464 rsuffix = ""
465 rsymbols = ""
467 if self._group_sorted:
468 group_id = (self._group_id, 0)
469 else:
470 group_id = (self._group_id, len(self._completions))
472 if color_tag is None:
473 color_tag = self._group_color_tag
475 self._completions.append(
476 Completion(
477 iprefix=self.iprefix,
478 completion=completion,
479 rsuffix=rsuffix,
480 rsymbols=rsymbols,
481 isuffix=self.isuffix,
482 comment=comment,
483 dprefix=dprefix,
484 dsuffix=dsuffix,
485 group_id=group_id,
486 group_color_tag=color_tag,
487 )
488 )
490 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None):
491 """
492 Add a new completions group.
494 All completions added after call to this method will be placed to the new group.
495 They will be grouped together, and colored according to the group's color tag.
497 :param sorted:
498 controls whether completions in the new group
499 should be sorted lexicographically.
500 :param color_tag:
501 which color tag should be used to display completions
502 and their help messages for this group.
504 See :attr:`yuio.widget.Option.color_tag` for details.
506 """
508 self._group_id += 1
509 self._group_sorted = sorted
510 self._group_color_tag = color_tag
512 @property
513 def num_completions(self) -> int:
514 """
515 Number of completions added so far.
517 """
519 return len(self._completions)
521 def split_off_prefix(self, delim: str | None = None, /):
522 """
523 Move everything up to the last occurrence of `delim`
524 from :attr:`~CompletionCollector.prefix`
525 to :attr:`~CompletionCollector.iprefix`.
527 :param delim:
528 delimiter to split off; :data:`None` value splits off on any whitespace
529 character, similar to :meth:`str.rsplit`.
531 """
533 delim = delim or " "
534 parts = self.prefix.rsplit(delim, maxsplit=1)
535 if len(parts) > 1:
536 self.iprefix += parts[0] + delim
537 self.prefix = parts[1]
539 def split_off_suffix(self, delim: str | None = None, /):
540 """
541 Move everything past the first occurrence of `delim`
542 from :attr:`~CompletionCollector.suffix`
543 to :attr:`~CompletionCollector.isuffix`.
545 :param delim:
546 delimiter to split off; :data:`None` value splits off on any whitespace
547 character, similar to :meth:`str.split`.
549 """
551 delim = delim or " "
552 parts = self.suffix.split(delim, maxsplit=1)
553 if len(parts) > 1:
554 self.suffix = parts[0]
555 self.isuffix = delim + parts[1] + self.isuffix
557 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]:
558 """
559 Finish collecting completions and return everything that was collected.
561 Do not reuse a collector after it was finalized.
563 :returns:
564 list of completions, sorted by their groups and preferred ordering
565 within each group.
567 If all completions start with a common prefix, a single completion
568 is returned containing this prefix.
570 """
572 if len(self._completions) > 1:
573 c0 = self._completions[0]
575 iprefix = c0.iprefix
576 isuffix = c0.isuffix
578 if (
579 derive_common_prefix
580 and self.full_prefix.startswith(iprefix)
581 and self.full_suffix.endswith(isuffix)
582 and all(
583 c.iprefix == iprefix and c.isuffix == isuffix
584 for c in self._completions
585 )
586 ):
587 # If all completions have the same `iprefix` and `isuffix`...
588 common_prefix = _commonprefix(
589 list(c.completion for c in self._completions)
590 )
591 if common_prefix and len(iprefix) + len(common_prefix) > len(
592 self.iprefix
593 ) + len(self.prefix):
594 # ...and they have a common prefix that is longer than what's entered so far,
595 # then complete this common prefix.
596 rsuffix = ""
597 rsymbols = ""
598 if all(
599 common_prefix == c.completion and rsuffix == c.rsuffix
600 for c in self._completions
601 ):
602 # If completing common prefix actually fulfills a completion, add `rsuffix` as well.
603 rsuffix = c0.rsuffix
604 rsymbols = c0.rsymbols
605 return [
606 Completion(
607 iprefix=iprefix,
608 completion=common_prefix,
609 rsuffix=rsuffix,
610 rsymbols=rsymbols,
611 isuffix=isuffix,
612 comment=None,
613 dprefix="",
614 dsuffix="",
615 group_id=(0, 0),
616 group_color_tag=None,
617 )
618 ]
620 self._completions.sort()
621 return self._completions
624_MAX_COMPLETION_CORRECTIONS: int = 1
625_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3
628@_t.final
629class _CorrectingCollector(CompletionCollector):
630 def __init__(self, text: str, pos: int):
631 super().__init__(text, pos)
633 self._has_corrections = False
635 def add(
636 self,
637 completion: str,
638 /,
639 *,
640 comment: str | None = None,
641 dprefix: str = "",
642 dsuffix: str = "",
643 color_tag: str | None = None,
644 ):
645 if not completion or completion in self.dedup_words:
646 return
648 a = self.prefix + self.suffix
649 b = completion
650 corrections = _corrections(a, b)
651 threshold = (
652 _MAX_COMPLETION_CORRECTIONS
653 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2
654 )
656 if corrections <= 1:
657 # this is a simple mistype, add it as usual
658 self._add(
659 completion,
660 comment=comment,
661 dprefix=dprefix,
662 dsuffix=dsuffix,
663 color_tag=color_tag,
664 )
665 elif corrections <= threshold:
666 # this is a correction, add it into corrections group
667 if comment:
668 comment = "corrected: " + comment
669 else:
670 comment = "corrected"
671 with self.save_state():
672 self._group_id = 0xFFFFFFFE # (big enough) - 1
673 self._group_color_tag = "corrected"
674 self._add(
675 completion,
676 comment=comment,
677 dprefix=dprefix,
678 dsuffix=dsuffix,
679 color_tag=color_tag,
680 )
681 self._has_corrections = True
683 def finalize(self, *, derive_common_prefix: bool = True) -> list[Completion]:
684 if self._has_corrections:
685 c0 = self._completions[0]
687 iprefix = ""
688 prefix = self.full_prefix
689 suffix = self.full_suffix
690 isuffix = ""
692 if prefix.startswith(c0.iprefix):
693 l = len(c0.iprefix)
694 iprefix = prefix[:l]
695 prefix = prefix[l:]
697 if suffix.endswith(c0.isuffix):
698 l = len(c0.isuffix)
699 isuffix = suffix[-l:]
700 suffix = suffix[:-l]
702 # If we have corrections, add original value to the end.
703 with self.save_state():
704 self._group_id = 0xFFFFFFFF # (big enough)
705 self._group_color_tag = "original"
706 self.iprefix = iprefix
707 self.isuffix = isuffix
708 self._add(prefix + suffix, comment="original")
710 self._completions.sort()
711 return self._completions
714def _corrections(a: str, b: str) -> float:
715 # Damerau–Levenshtein distance (Optimal String Alignment distance)
717 a = a.casefold()
718 b = b.casefold()
719 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)]
720 for i in range(len(a) + 1):
721 d[i][0] = i
722 for j in range(len(b) + 1):
723 d[0][j] = j
724 for i in range(1, len(a) + 1):
725 for j in range(1, len(b) + 1):
726 d[i][j] = min(
727 # Add to `a`:
728 d[i - 1][j] + 1,
729 # Add to `b`:
730 d[i][j - 1] + 1,
731 # Replace:
732 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]),
733 # Transpose:
734 (
735 d[i - 2][j - 2] + (a[i - 1] != b[j - 1])
736 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1]
737 else math.inf
738 ),
739 )
741 return d[-1][-1]
744class Completer(abc.ABC):
745 """
746 An interface for text completion providers.
748 """
750 def complete(
751 self,
752 text: str,
753 pos: int,
754 /,
755 *,
756 do_corrections: bool = True,
757 derive_common_prefix: bool = True,
758 ) -> list[Completion]:
759 """
760 Complete the given text at the given cursor position.
762 :param text:
763 text that is being completed.
764 :param pos:
765 position of the cursor in the text. ``0`` means the cursor
766 is before the first character, ``len(text)`` means the cursor
767 is after the last character.
768 :param do_corrections:
769 if :data:`True` (default), completion system will try to guess
770 if there are any misspells in the `text`, and offer to correct them.
771 :param derive_common_prefix:
772 if :data:`True` (default), and all returned completions have a non-empty
773 common prefix, return a single completion with this prefix instead.
774 :returns:
775 a sorted list of completions.
777 If all completions start with a common prefix, a single completion
778 is returned containing this prefix.
780 """
782 collector = CompletionCollector(text, pos)
783 with collector.save_state():
784 self._process(collector)
785 completions = collector.finalize()
786 if completions or not do_corrections:
787 return completions
789 collector = _CorrectingCollector(text, pos)
790 with collector.save_state():
791 self._process(collector)
792 return collector.finalize(derive_common_prefix=derive_common_prefix)
794 @abc.abstractmethod
795 def _process(self, collector: CompletionCollector, /):
796 """
797 Generate completions and add them to the given collector.
799 Implementing this class is straight forward, just feed all possible
800 completions to the collector. For example, let's implement a completer
801 for environment variables:
803 .. code-block:: python
805 class EnvVarCompleter(Completer):
806 def _process(self, collector: CompletionCollector):
807 for var in os.environ.keys():
808 collector.add(var)
810 """
812 raise NotImplementedError()
814 def _get_completion_model(
815 self, *, is_many: bool = False
816 ) -> _OptionSerializer.Model:
817 """
818 Internal, do not use.
820 """
822 return _OptionSerializer.CustomCompleter(self)
825class Empty(Completer):
826 """
827 An empty completer that returns no values.
829 """
831 def _process(self, collector: CompletionCollector):
832 pass # nothing to do
834 def _get_completion_model(
835 self, *, is_many: bool = False
836 ) -> _OptionSerializer.Model:
837 return _OptionSerializer.Model()
840@dataclass(frozen=True, slots=True)
841class Option:
842 """
843 A single completion option for the :class:`Choice` completer.
845 """
847 completion: str
848 """
849 This string will replace an element that is being completed.
851 """
853 comment: str | None = None
854 """
855 Short comment displayed alongside the completion.
857 """
860class Choice(Completer):
861 """
862 Completes input from a predefined list of completions.
864 :param choices:
865 options to choose completion from.
867 """
869 def __init__(self, choices: _t.Collection[Option], /):
870 self._choices: _t.Collection[Option] = choices
872 def _process(self, collector: CompletionCollector, /):
873 for choice in self._choices:
874 collector.add(choice.completion, comment=choice.comment)
876 def _get_completion_model(
877 self, *, is_many: bool = False
878 ) -> _OptionSerializer.Model:
879 if any(option.comment for option in self._choices):
880 return _OptionSerializer.ChoiceWithDesc(
881 [(option.completion, option.comment or "") for option in self._choices]
882 )
883 else:
884 return _OptionSerializer.Choice(
885 [option.completion for option in self._choices]
886 )
889class Alternative(Completer):
890 """
891 Joins outputs from multiple completers.
893 :param completers:
894 list of inner completers.
896 This is a list of tuples. First tuple element is a description of a completion
897 group. It will be displayed when this completer is used in shells
898 that support it (namely, ZSH). Second tuple element is the inner completer
899 itself.
901 """
903 def __init__(self, completers: list[tuple[str, Completer]], /):
904 self._completers = completers
906 def _process(self, collector: CompletionCollector, /):
907 for _, completer in self._completers:
908 with collector.save_state():
909 collector.add_group()
910 completer._process(collector)
912 def _get_completion_model(
913 self, *, is_many: bool = False
914 ) -> _OptionSerializer.Model:
915 return _OptionSerializer.Alternative(
916 [
917 (name, completer._get_completion_model(is_many=is_many))
918 for name, completer in self._completers
919 ]
920 )
923class List(Completer):
924 """
925 Completes a value-separated list of elements.
927 :param inner:
928 completer for list items.
929 :param delimiter:
930 a character that separates list items. :data:`None` separates by any whitespace
931 character, similar to :meth:`str.split`.
932 :param allow_duplicates:
933 whether to show completions that already appear in the list.
935 """
937 def __init__(
938 self,
939 inner: Completer,
940 /,
941 *,
942 delimiter: str | None = None,
943 allow_duplicates: bool = False,
944 ):
945 self._inner = inner
946 if delimiter == "":
947 raise ValueError("empty delimiter")
948 self._delimiter = delimiter
949 self._allow_duplicates = allow_duplicates
951 def _process(self, collector: CompletionCollector, /):
952 collector.split_off_prefix(self._delimiter)
953 collector.split_off_suffix(self._delimiter)
954 collector.rsuffix = self._delimiter or " "
955 collector.rsymbols += self._delimiter or string.whitespace
957 if not self._allow_duplicates:
958 dedup_words = set(
959 collector.iprefix.split(self._delimiter)
960 + collector.isuffix.split(self._delimiter)
961 )
962 if collector.text in dedup_words:
963 dedup_words.remove(collector.text)
964 collector.dedup_words = frozenset(dedup_words)
965 else:
966 collector.dedup_words = frozenset()
968 self._inner._process(collector)
970 def _get_completion_model(
971 self, *, is_many: bool = False
972 ) -> _OptionSerializer.Model:
973 if is_many:
974 return _OptionSerializer.ListMany(
975 self._delimiter or " ", self._inner._get_completion_model()
976 )
977 else:
978 return _OptionSerializer.List(
979 self._delimiter or " ", self._inner._get_completion_model()
980 )
983class Tuple(Completer):
984 """
985 Completes a value-separated tuple of elements.
987 :param inner:
988 completers for each tuple element.
989 :param delimiter:
990 a character that separates list items. :data:`None` separates by any whitespace
991 character, similar to :meth:`str.split`.
993 """
995 def __init__(self, *inner: Completer, delimiter: str | None = None):
996 self._inner = inner
997 if delimiter == "":
998 raise ValueError("empty delimiter")
999 self._delimiter = delimiter
1001 def _process(self, collector: CompletionCollector, /):
1002 pos = len(collector.prefix.split(self._delimiter))
1003 if (
1004 pos
1005 and self._delimiter is None
1006 and collector.prefix
1007 and collector.prefix[-1] in string.whitespace
1008 ):
1009 # `.split(None)` will trim whitespaces at the end.
1010 # Make sure we count those towards the current position in the tuple.
1011 pos += 1
1012 if pos > len(self._inner):
1013 return
1014 if pos > 0:
1015 pos -= 1
1017 collector.split_off_prefix(self._delimiter)
1018 collector.split_off_suffix(self._delimiter)
1019 collector.rsuffix = self._delimiter or " "
1020 collector.rsymbols += self._delimiter or string.whitespace
1022 self._inner[pos]._process(collector)
1024 def _get_completion_model(
1025 self, *, is_many: bool = False
1026 ) -> _OptionSerializer.Model:
1027 if is_many:
1028 return _OptionSerializer.TupleMany(
1029 self._delimiter or " ",
1030 [inner._get_completion_model() for inner in self._inner],
1031 )
1032 else:
1033 return _OptionSerializer.Tuple(
1034 self._delimiter or " ",
1035 [inner._get_completion_model() for inner in self._inner],
1036 )
1039class File(Completer):
1040 """
1041 Completes file paths.
1043 :param extensions:
1044 allowed file extensions, should include the leading dot.
1046 """
1048 def __init__(self, extensions: str | _t.Collection[str] | None = None):
1049 if isinstance(extensions, str):
1050 self._extensions = [extensions]
1051 elif extensions is not None:
1052 self._extensions = list(extensions)
1053 else:
1054 self._extensions = None
1056 def _process(self, collector: CompletionCollector, /):
1057 base, name = os.path.split(collector.prefix)
1058 if base and not base.endswith(os.path.sep):
1059 base += os.path.sep
1060 collector.iprefix += base
1061 collector.prefix = name
1062 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0]
1063 resolved = pathlib.Path(base).expanduser().resolve()
1064 rsuffix = collector.rsuffix
1065 if resolved.is_dir():
1066 if name.startswith("."):
1067 collector.rsuffix = ""
1068 collector.add(os.path.curdir + os.path.sep, color_tag="dir")
1069 collector.add(os.path.pardir + os.path.sep, color_tag="dir")
1070 if name.startswith("~"):
1071 collector.rsuffix = ""
1072 collector.add("~" + os.path.sep, color_tag="dir")
1073 try:
1074 for path in resolved.iterdir():
1075 if path.is_dir():
1076 if path.is_symlink():
1077 color_tag = "symlink"
1078 dsuffix = "@"
1079 else:
1080 color_tag = "dir"
1081 dsuffix = ""
1082 collector.rsuffix = ""
1083 collector.add(
1084 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix
1085 )
1086 elif self._extensions is None or any(
1087 path.name.endswith(ext) for ext in self._extensions
1088 ):
1089 collector.rsuffix = rsuffix
1090 color_tag = None
1091 dsuffix = ""
1092 if path.is_symlink():
1093 color_tag = "symlink"
1094 dsuffix = "@"
1095 elif path.is_file():
1096 if (os.name != "nt" and os.access(path, os.X_OK)) or (
1097 os.name == "nt" and path.suffix == ".exe"
1098 ):
1099 color_tag = "exec"
1100 dsuffix = "*"
1101 else:
1102 color_tag = "file"
1103 elif path.is_socket():
1104 color_tag = "socket"
1105 dsuffix = "="
1106 elif path.is_fifo():
1107 color_tag = "pipe"
1108 dsuffix = "|"
1109 elif path.is_block_device():
1110 color_tag = "block_device"
1111 dsuffix = "#"
1112 elif path.is_char_device():
1113 color_tag = "char_device"
1114 dsuffix = "%"
1115 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix)
1116 except PermissionError:
1117 return
1119 def _get_completion_model(
1120 self, *, is_many: bool = False
1121 ) -> _OptionSerializer.Model:
1122 return _OptionSerializer.File(
1123 "|".join(extension.lstrip(".") for extension in self._extensions or [])
1124 )
1127class Dir(File):
1128 """
1129 Completes directories.
1131 """
1133 def __init__(self):
1134 super().__init__([])
1136 def _get_completion_model(
1137 self, *, is_many: bool = False
1138 ) -> _OptionSerializer.Model:
1139 return _OptionSerializer.Dir()
1142class _CustomCompleterRegistrar:
1143 def __init__(self) -> None:
1144 self._custom_completer_index = 0
1146 def _register_custom_completer(self) -> int:
1147 index = self._custom_completer_index
1148 self._custom_completer_index += 1
1149 return index
1152class _OptionSerializer(_CustomCompleterRegistrar):
1153 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ")
1155 def __init__(
1156 self,
1157 flags: list[str],
1158 path: str,
1159 nargs: str | int,
1160 metavar: str | tuple[str, ...],
1161 help: str | yuio.Disabled,
1162 ):
1163 super().__init__()
1165 self._flags = flags
1166 self._path = path
1167 self._nargs = nargs
1168 self._metavar = metavar if isinstance(metavar, tuple) else (metavar,)
1169 self._help: str | yuio.Disabled = help
1171 def dump(self, model: _OptionSerializer.Model):
1172 if self._help is yuio.DISABLED:
1173 desc = "__yuio_hide__"
1174 else:
1175 desc = self._process_help(self._help)
1177 compspec = [
1178 self._path,
1179 " ".join(self._flags),
1180 desc,
1181 " ".join(
1182 re.sub(
1183 r"[\\ ]",
1184 lambda s: "\\S" if s.group() == " " else "\\L",
1185 str(m),
1186 )
1187 or ""
1188 for m in self._metavar
1189 ),
1190 str(self._nargs),
1191 *model.dump(self),
1192 ]
1194 return "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec)
1196 @staticmethod
1197 def _process_help(help: str):
1198 if (
1199 len(help) > 2
1200 and help[0].isupper()
1201 and (help[1].islower() or help[1].isspace())
1202 ):
1203 help = help[0].lower() + help[1:]
1204 if help.endswith(".") and not help.endswith(".."):
1205 help = help[:-1]
1206 if (index := help.find("\n\n")) != -1:
1207 help = help[:index]
1208 return yuio.string.strip_color_tags(help)
1210 @staticmethod
1211 def _dump_nested(compspec: _t.Iterable[object], s: _OptionSerializer) -> list[str]:
1212 contents = []
1214 for item in compspec:
1215 contents.extend(_OptionSerializer._dump_nested_item(item, s))
1217 return contents
1219 @staticmethod
1220 def _dump_nested_item(item: object, s: _OptionSerializer) -> list[str]:
1221 contents = []
1223 if isinstance(item, _OptionSerializer.Model):
1224 contents.extend(item.dump(s))
1225 elif isinstance(item, list):
1226 contents.append(str(len(item)))
1227 for sub_item in item:
1228 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s))
1229 elif isinstance(item, tuple):
1230 for sub_item in item:
1231 contents.extend(_OptionSerializer._dump_nested_item(sub_item, s))
1232 else:
1233 contents.append(str(item))
1235 return contents
1237 @dataclass
1238 class Model:
1239 tag: typing.ClassVar[str] = "-"
1241 def __init_subclass__(cls, tag: str = "-", **kwargs):
1242 super().__init_subclass__(**kwargs)
1243 cls.tag = tag
1245 def dump(self, s: _OptionSerializer) -> list[str]:
1246 contents = _OptionSerializer._dump_nested(
1247 (getattr(self, field.name) for field in dataclasses.fields(self)), s
1248 )
1249 return [self.tag, str(len(contents)), *contents]
1251 def get_completer_at_index(
1252 self, s: _CustomCompleterRegistrar, index: int
1253 ) -> Completer | None:
1254 return None
1256 @dataclass
1257 class File(Model, tag="f"):
1258 ext: str
1260 @dataclass
1261 class Dir(Model, tag="d"):
1262 pass
1264 @dataclass
1265 class Choice(Model, tag="c"):
1266 choices: list[str]
1268 def dump(self, s: _OptionSerializer) -> list[str]:
1269 return [self.tag, str(len(self.choices)), *self.choices]
1271 @dataclass
1272 class ChoiceWithDesc(Model, tag="cd"):
1273 choices: list[tuple[str, str]]
1275 def dump(self, s: _OptionSerializer) -> list[str]:
1276 return [
1277 self.tag,
1278 str(len(self.choices) * 2),
1279 *[c[0] for c in self.choices],
1280 *[s._process_help(c[1]) for c in self.choices],
1281 ]
1283 @dataclass
1284 class Git(Model, tag="g"):
1285 modes: str
1287 @dataclass
1288 class List(Model, tag="l"):
1289 delim: str
1290 inner: _OptionSerializer.Model
1292 def get_completer_at_index(
1293 self, s: _CustomCompleterRegistrar, index: int
1294 ) -> Completer | None:
1295 return self.inner.get_completer_at_index(s, index)
1297 @dataclass
1298 class ListMany(List, tag="lm"):
1299 pass
1301 @dataclass
1302 class Tuple(Model, tag="t"):
1303 delim: str
1304 inner: list[_OptionSerializer.Model]
1306 def get_completer_at_index(
1307 self, s: _CustomCompleterRegistrar, index: int
1308 ) -> Completer | None:
1309 for inner in self.inner:
1310 if completer := inner.get_completer_at_index(s, index):
1311 return completer
1312 return None
1314 @dataclass
1315 class TupleMany(Tuple, tag="tm"):
1316 pass
1318 @dataclass
1319 class Alternative(Model, tag="a"):
1320 alternatives: list[tuple[str, _OptionSerializer.Model]]
1322 def get_completer_at_index(
1323 self, s: _CustomCompleterRegistrar, index: int
1324 ) -> Completer | None:
1325 for _, inner in self.alternatives:
1326 if completer := inner.get_completer_at_index(s, index):
1327 return completer
1328 return None
1330 @dataclass
1331 class CustomCompleter(Model, tag="cc"):
1332 completer: Completer
1334 def dump(self, s: _OptionSerializer) -> list[str]:
1335 return [
1336 self.tag,
1337 "1",
1338 json.dumps(
1339 {
1340 "path": s._path,
1341 "flags": s._flags,
1342 "index": s._register_custom_completer(),
1343 }
1344 ),
1345 ]
1347 def get_completer_at_index(
1348 self, s: _CustomCompleterRegistrar, index: int
1349 ) -> Completer | None:
1350 this_index = s._register_custom_completer()
1351 if index == this_index:
1352 return self.completer
1353 else:
1354 return None
1357class _ProgramSerializer:
1358 def __init__(self, path: str = "") -> None:
1359 self._path = path
1360 self._lines: list[str] = []
1361 self._positionals = 0
1362 self._subcommands: dict[
1363 str, tuple[_ProgramSerializer, bool, str | yuio.Disabled]
1364 ] = {}
1366 def add_option(
1367 self,
1368 flags: list[str] | yuio.Positional,
1369 nargs: str | int,
1370 metavar: str | tuple[str, ...],
1371 help: str | yuio.Disabled,
1372 completer: Completer | None,
1373 is_many: bool,
1374 ):
1375 if flags is yuio.POSITIONAL:
1376 flags = [str(self._positionals)]
1377 self._positionals += 1
1378 if completer is None:
1379 model = _OptionSerializer.Model()
1380 else:
1381 model = completer._get_completion_model(is_many=is_many)
1382 self._add_option(flags, nargs, metavar, help, model)
1384 def _add_option(
1385 self,
1386 flags: list[str],
1387 nargs: str | int,
1388 metavar: str | tuple[str, ...],
1389 help: str | yuio.Disabled,
1390 model: _OptionSerializer.Model,
1391 ):
1392 self._lines.append(
1393 _OptionSerializer(flags, self._path, nargs, metavar, help).dump(model)
1394 )
1396 def add_subcommand(
1397 self,
1398 name: str,
1399 is_alias: bool,
1400 help: str | yuio.Disabled,
1401 ):
1402 serializer = _ProgramSerializer(f"{self._path}/{name}")
1403 self._subcommands[name] = (serializer, is_alias, help)
1404 return serializer
1406 def _dump(self):
1407 if self._subcommands:
1408 self._add_option(
1409 ["c"],
1410 1,
1411 "<subcommand>",
1412 "Subcommand.",
1413 _OptionSerializer.ChoiceWithDesc(
1414 [
1415 (name, help)
1416 for name, (_, is_alias, help) in self._subcommands.items()
1417 if not is_alias and help is not yuio.DISABLED
1418 ]
1419 ),
1420 )
1422 for _, (serializer, _, _) in self._subcommands.items():
1423 self._lines.extend(serializer._dump())
1425 return self._lines
1427 def dump(self):
1428 return "\n".join(self._dump())
1431_PROG_ESCAPE = str.maketrans(
1432 string.punctuation + string.whitespace,
1433 "_" * (len(string.punctuation) + len(string.whitespace)),
1434)
1437def _run_completer_at_index(completer: Completer, is_many: bool, index: int, word: str):
1438 registrar = _CustomCompleterRegistrar()
1439 model = completer._get_completion_model(is_many=is_many)
1440 completer_at_index = model.get_completer_at_index(registrar, index)
1441 if completer_at_index:
1442 # It's up to user's shell to do corrections and derive common prefix.
1443 completions = completer.complete(
1444 word, len(word), do_corrections=False, derive_common_prefix=False
1445 )
1446 for completion in completions:
1447 print(
1448 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}",
1449 file=sys.__stdout__,
1450 )
1453def _write_completions(compdata: str, prog: str | None = None, shell: str = "all"):
1454 import yuio.io
1456 true_prog = prog or pathlib.Path(sys.argv[0]).stem
1457 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE)
1459 if pathlib.Path(sys.argv[0]).stem == "__main__":
1460 yuio.io.failure(
1461 "You've invoked this program as a python module, most likely with "
1462 "`python -m <module>`. For completions to work, the program "
1463 "must be invoked as a command in your `$PATH`"
1464 )
1465 sys.exit(1)
1466 if not prog:
1467 yuio.io.failure("Failed to generate completion because program name is empty")
1468 sys.exit(1)
1469 if not re.match(r"^[a-zA-Z0-9_-]+$", prog):
1470 yuio.io.failure(
1471 "Failed to generate completion due to "
1472 "forbidden characters in program name: `%r`",
1473 prog,
1474 )
1475 sys.exit(1)
1477 if shell == "uninstall":
1478 shell = "all"
1479 yuio.io.heading("Uninstalling completions for `%s`", true_prog)
1480 install = False
1481 else:
1482 yuio.io.heading("Generating completions for `%s`", true_prog)
1483 install = True
1485 if not shutil.which(true_prog):
1486 yuio.io.warning(
1487 "Program `%s` is not in your `$PATH`. Completions might not be able "
1488 "to initialize",
1489 true_prog,
1490 )
1492 if os.name == "nt":
1493 data_home = cache_home = config_home = pathlib.Path(
1494 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local")
1495 )
1496 else:
1497 data_home = pathlib.Path(
1498 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share")
1499 )
1500 cache_home = pathlib.Path(
1501 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache")
1502 )
1503 config_home = pathlib.Path(
1504 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config")
1505 )
1507 compdata_path = data_home / f"yuio/{prog}.compdata.tsv"
1509 if install:
1510 task_heading = "Installing completions"
1511 else:
1512 task_heading = "Uninstalling completions"
1514 with yuio.io.Task(task_heading) as t:
1515 if install:
1516 os.makedirs(data_home / "yuio", exist_ok=True)
1517 compdata_path.write_text(compdata)
1518 yuio.io.info(
1519 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path
1520 )
1521 elif compdata_path.exists():
1522 os.remove(compdata_path)
1523 yuio.io.info("Removed <c path>%s</c>", compdata_path)
1525 if shell in ["all", "bash"]:
1526 t.comment("Bash")
1527 _write_bash_script(
1528 prog,
1529 true_prog,
1530 install,
1531 compdata_path,
1532 data_home,
1533 cache_home,
1534 config_home,
1535 )
1536 if shell in ["all", "zsh"]:
1537 t.comment("Zsh")
1538 _write_zsh_script(
1539 prog,
1540 true_prog,
1541 install,
1542 compdata_path,
1543 data_home,
1544 cache_home,
1545 config_home,
1546 )
1547 if shell in ["all", "fish"]:
1548 t.comment("Fish")
1549 _write_fish_script(
1550 prog,
1551 true_prog,
1552 install,
1553 compdata_path,
1554 data_home,
1555 cache_home,
1556 config_home,
1557 )
1558 if shell in ["all", "pwsh"]:
1559 t.comment("PowerShell")
1560 _write_pwsh_script(
1561 prog,
1562 true_prog,
1563 install,
1564 compdata_path,
1565 data_home,
1566 cache_home,
1567 config_home,
1568 )
1570 yuio.io.success("All done! Please restart your shell for changes to take effect.")
1571 if install:
1572 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog)
1575def _write_bash_script(
1576 prog: str,
1577 true_prog: str,
1578 install: bool,
1579 compdata_path: pathlib.Path,
1580 data_home: pathlib.Path,
1581 cache_home: pathlib.Path,
1582 config_home: pathlib.Path,
1583):
1584 import yuio.exec
1585 import yuio.io
1587 if os.name == "nt":
1588 yuio.io.warning(
1589 "Skipped <c note>Bash</c>: completion script doesn't support windows"
1590 )
1591 return
1593 if install and not shutil.which("bash"):
1594 yuio.io.warning("Skipped <c note>Bash</c>: `bash` command is not available")
1595 return
1597 try:
1598 bash_completions_home = yuio.exec.exec(
1599 "bash",
1600 "-lc",
1601 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"',
1602 ).splitlines()[-1]
1603 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1604 bash_completions_home = data_home / "bash-completion/completions/"
1605 bash_completions_home = pathlib.Path(bash_completions_home)
1606 script_dest = bash_completions_home / true_prog
1608 if install:
1609 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path)
1610 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest)
1611 elif script_dest.exists():
1612 os.remove(script_dest)
1613 yuio.io.info("Removed <c path>%s</c>", script_dest)
1616def _write_zsh_script(
1617 prog: str,
1618 true_prog: str,
1619 install: bool,
1620 compdata_path: pathlib.Path,
1621 data_home: pathlib.Path,
1622 cache_home: pathlib.Path,
1623 config_home: pathlib.Path,
1624):
1625 import yuio.exec
1626 import yuio.io
1628 if os.name == "nt":
1629 yuio.io.warning(
1630 "Skipped <c note>Zsh</c>: completion script doesn't support windows"
1631 )
1632 return
1634 if install and not shutil.which("zsh"):
1635 yuio.io.warning("Skipped <c note>Zsh</c>: `zsh` command is not available")
1636 return
1638 needs_cache_cleanup = False
1640 zsh_completions_home = data_home / "zsh/completions"
1642 if not zsh_completions_home.exists():
1643 zsh_completions_home.mkdir(parents=True)
1644 # Completions home needs rwxr-xr-x, otherwise zsh will not load
1645 # our completion scripts.
1646 zsh_completions_home.chmod(mode=0o755)
1648 script_dest = zsh_completions_home / ("_" + true_prog)
1650 if install:
1651 needs_cache_cleanup = True
1652 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path)
1653 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest)
1654 elif script_dest.exists():
1655 needs_cache_cleanup = True
1657 os.remove(script_dest)
1658 yuio.io.info("Removed <c path>%s</c>", script_dest)
1660 try:
1661 fpath = (
1662 yuio.exec.exec(
1663 "zsh",
1664 "-lc",
1665 "echo -n $FPATH",
1666 )
1667 .splitlines()[-1]
1668 .split(":")
1669 )
1670 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1671 fpath = []
1673 try:
1674 zhome = yuio.exec.exec(
1675 "zsh",
1676 "-lc",
1677 "echo -n ${ZDOTDIR:-$HOME}",
1678 ).splitlines()[-1]
1679 except (subprocess.CalledProcessError, IndexError, FileNotFoundError):
1680 zhome = pathlib.Path.home()
1682 zhome = pathlib.Path(zhome)
1683 zprofile_path = zhome / ".zprofile"
1684 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n"
1686 if install:
1687 if str(zsh_completions_home) not in fpath:
1688 with open(zprofile_path, "a") as f:
1689 f.write(zprofile_append_text)
1690 yuio.io.info(
1691 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`",
1692 zprofile_path,
1693 zsh_completions_home,
1694 )
1695 elif zprofile_path.exists():
1696 zprofile_text = zprofile_path.read_text()
1697 if zprofile_append_text in zprofile_text:
1698 yuio.io.info(
1699 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1700 " because other completions might rely on them",
1701 zprofile_path,
1702 )
1704 if not needs_cache_cleanup:
1705 return
1707 # Try to remove completions cache from the most common places.
1708 for zcomp_basedir in [zhome, cache_home / "prezto"]:
1709 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir():
1710 continue
1711 for file in zcomp_basedir.iterdir():
1712 if file.is_file() and re.match(r"^\.?zcompdump", file.name):
1713 os.remove(file)
1714 yuio.io.info(
1715 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>",
1716 file,
1717 )
1719 try:
1720 # Run zsh with the right flags in case zshrc runs compinit.
1721 # If after generating completions user runs `zsh` without the `-l` flag,
1722 # our changes to fpath will not be visible, and compinit will dump
1723 # an invalid version of cache. To avoid this, we call zsh ourselves
1724 # before the user has a chance to do it. Notice, though, that we don't
1725 # run `compdump`. This is because we can't be sure that the user uses
1726 # the default cache path (~/.zcompdump).
1727 yuio.exec.exec("zsh", "-lc", "true")
1728 except (subprocess.CalledProcessError, FileNotFoundError):
1729 pass
1732def _write_fish_script(
1733 prog: str,
1734 true_prog: str,
1735 install: bool,
1736 compdata_path: pathlib.Path,
1737 data_home: pathlib.Path,
1738 cache_home: pathlib.Path,
1739 config_home: pathlib.Path,
1740):
1741 import yuio.io
1743 if os.name == "nt":
1744 yuio.io.warning(
1745 "Skipped <c note>Fish</c>: completion script doesn't support windows"
1746 )
1747 return
1749 if install and not shutil.which("fish"):
1750 yuio.io.warning("Skipped <c note>Fish</c>: `fish` command is not available")
1751 return
1753 fish_completions_home = data_home / "fish/vendor_completions.d"
1754 script_dest = fish_completions_home / (true_prog + ".fish")
1756 if install:
1757 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path)
1758 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest)
1759 elif script_dest.exists():
1760 os.remove(script_dest)
1761 yuio.io.info("Removed <c path>%s</c>", script_dest)
1764def _write_pwsh_script(
1765 prog: str,
1766 true_prog: str,
1767 install: bool,
1768 compdata_path: pathlib.Path,
1769 data_home: pathlib.Path,
1770 cache_home: pathlib.Path,
1771 config_home: pathlib.Path,
1772):
1773 import yuio.exec
1774 import yuio.io
1776 if shutil.which("pwsh"):
1777 command = "pwsh"
1778 elif shutil.which("powershell"):
1779 command = "powershell"
1780 else:
1781 yuio.io.warning(
1782 "Skipped <c note>PowerShell</c>: `pwsh` command is not available"
1783 )
1784 return
1786 try:
1787 pwsh_data = (
1788 yuio.exec.exec(
1789 command,
1790 "-Command",
1791 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"',
1792 )
1793 .splitlines()[-1]
1794 .strip()
1795 )
1796 except (subprocess.CalledProcessError, IndexError) as e:
1797 yuio.io.warning(
1798 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s",
1799 e,
1800 )
1801 return
1802 except FileNotFoundError:
1803 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found")
1804 return
1805 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data):
1806 version = match.group(1)
1807 profile_s = match.group(2)
1808 else:
1809 yuio.io.warning(
1810 "Skipped <c note>PowerShell</c>: can't determine powershell version"
1811 )
1812 return
1813 if not profile_s:
1814 yuio.io.warning(
1815 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty"
1816 )
1817 return
1818 if tuple(int(v) for v in version.split(".")) < (5, 0, 0):
1819 yuio.io.warning(
1820 "Skipped <c note>PowerShell</c>: completions script requires "
1821 "PowerShell 5 or newer, you have %s",
1822 version,
1823 )
1824 return
1826 profile_path = pathlib.Path(profile_s).expanduser().resolve()
1827 profile_path.parent.mkdir(exist_ok=True, parents=True)
1829 data_dir = data_home / "yuio/pwsh"
1830 loader_path = data_dir / "LoadCompletions.ps1"
1831 script_dest = data_dir / f"_{true_prog}.ps1"
1832 if install:
1833 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path)
1834 yuio.io.info(
1835 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest
1836 )
1837 _write_pwsh_loader(loader_path, data_dir)
1838 elif script_dest.exists():
1839 os.remove(script_dest)
1840 yuio.io.info("Removed <c path>%s</c>", script_dest)
1842 try:
1843 data_dirs = [
1844 pathlib.Path(f).expanduser().resolve()
1845 for f in yuio.exec.exec(
1846 command,
1847 "-Command",
1848 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")',
1849 )
1850 .strip()
1851 .splitlines()
1852 ]
1853 except (subprocess.CalledProcessError, FileNotFoundError):
1854 return
1856 pwsh_profile_append_text = f"\n. {loader_path}\n"
1858 if install:
1859 if data_dir not in data_dirs:
1860 with open(profile_path, "a") as f:
1861 f.write(pwsh_profile_append_text)
1862 yuio.io.info(
1863 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup",
1864 profile_path,
1865 loader_path,
1866 )
1867 elif profile_path.exists():
1868 pwsh_profile_text = profile_path.read_text()
1869 if pwsh_profile_append_text in pwsh_profile_text:
1870 yuio.io.info(
1871 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1872 " because other completions might rely on them",
1873 profile_path,
1874 )
1877def _write_script(
1878 path: pathlib.Path,
1879 script_name: str,
1880 prog: str,
1881 true_prog: str,
1882 compdata_path: pathlib.Path,
1883):
1884 script_template = _read_script(script_name)
1885 script = (
1886 (script_template)
1887 .replace("@prog@", prog)
1888 .replace("@true_prog@", true_prog)
1889 .replace("@data@", str(compdata_path))
1890 .replace("@version@", yuio.__version__)
1891 )
1893 path.parent.mkdir(exist_ok=True, parents=True)
1894 path.write_text(script)
1895 path.chmod(0o755)
1898def _read_script(script_name: str):
1899 import zipfile
1900 import zipimport
1902 if isinstance(__loader__, zipimport.zipimporter):
1903 # Yuio is imported directly from a wheel.
1904 with zipfile.ZipFile(__loader__.archive) as archive:
1905 script_template = archive.read("yuio/_complete/" + script_name)
1906 return script_template.decode()
1907 else:
1908 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name
1909 return script_template_path.read_text()
1912def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path):
1913 import yuio.io
1915 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1"
1916 loader_template = loader_template_path.read_text()
1918 loader_version = re.search(
1919 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE
1920 )
1921 assert loader_version
1923 if loader_path.exists() and loader_path.is_file():
1924 current_loader = loader_path.read_text()
1925 current_version_s = re.search(
1926 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE
1927 )
1929 if current_version_s is None:
1930 yuio.io.warning(
1931 "<c note>Note:</c> can't determine version of <c path>%s</c>, "
1932 "file will be overridden",
1933 loader_path,
1934 )
1935 elif int(loader_version.group(1)) <= int(current_version_s.group(1)):
1936 return
1938 loader_template = loader_template.replace("@data@", str(data_dir))
1939 loader_path.write_text(loader_template)
1940 loader_path.chmod(0o755)
1941 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)