Coverage for yuio / complete.py: 74%
779 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 10:05 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 10:05 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9This module provides autocompletion functionality for widgets and CLI.
12Completer basics
13----------------
15All completers are derived from the :class:`Completer` base class
16with a simple interface:
18.. autoclass:: Completer
19 :members:
21.. autoclass:: Completion
22 :members:
25Completers
26----------
28Yuio provides basic completers that cover most of the cases:
30.. autoclass:: Empty
32.. autoclass:: Alternative
34.. autoclass:: Choice
36.. autoclass:: Option
37 :members:
39.. autoclass:: List
41.. autoclass:: Tuple
43.. autoclass:: File
45.. autoclass:: Dir
48Implementing your own completer
49-------------------------------
51To implement a custom completer, subclass :class:`Completer` and implement
52its :meth:`~Completer._process` method.
54.. note::
56 When using a custom completer for CLI flags in :mod:`yuio.app`,
57 completion script will invoke your program with special arguments
58 to run the completer and get its result.
60.. class:: Completer
61 :noindex:
63 .. automethod:: _process
65.. autoclass:: CompletionCollector
67"""
69from __future__ import annotations
71import abc
72import argparse
73import contextlib
74import dataclasses
75import enum
76import functools
77import json
78import math
79import os
80import pathlib
81import re
82import shutil
83import string
84import subprocess
85import sys
86import typing
87from dataclasses import dataclass
89import yuio
90import yuio.string
91from yuio import _typing as _t
93__all__ = [
94 "Alternative",
95 "Choice",
96 "Completer",
97 "Completion",
98 "CompletionCollector",
99 "Dir",
100 "Empty",
101 "File",
102 "List",
103 "Option",
104 "Tuple",
105]
108@dataclass(frozen=True, slots=True)
109@functools.total_ordering
110class Completion:
111 """
112 A single completion.
114 """
116 iprefix: str
117 """
118 See :class:`CompletionCollector.iprefix` for details.
120 """
122 completion: str
123 """
124 Text of the completion.
126 """
128 rsuffix: str
129 """
130 See :class:`CompletionCollector.rsuffix` for details.
132 """
134 rsymbols: str
135 """
136 See :class:`CompletionCollector.rsymbols` for details.
138 """
140 isuffix: str
141 """
142 See :class:`CompletionCollector.isuffix` for details.
144 """
146 comment: str | None
147 """
148 Short comment displayed alongside the completion.
150 """
152 dprefix: str
153 """
154 Prefix that will be displayed before :attr:`~Completion.completion`
155 when listing completions, but will not be inserted once completion
156 is applied.
158 """
160 dsuffix: str
161 """
162 Like :attr:`~Completion.dprefix`, but it's a suffix.
164 """
166 group_id: _t.SupportsLt[_t.Any] = dataclasses.field(repr=False)
167 """
168 Group id, used to sort completions.
170 Actual content of this property is an implementation detail.
172 """
174 group_color_tag: str | None
175 """
176 Color tag that's used when displaying this completion.
178 See :meth:`CompletionCollector.add_group` for details.
180 """
182 def __lt__(self, other: Completion) -> bool:
183 """
184 Completions are ordered by their groups and then alphabetically.
186 """
188 return self.group_id < other.group_id or (
189 self.group_id == other.group_id and self.completion < other.completion
190 )
193@dataclass(init=False, eq=False, repr=False, match_args=False)
194class CompletionCollector:
195 """
196 A class that collects completions as completers are running.
198 The text that is being completed is split into four parts, similar
199 to what you might see in ZSH completion widgets. The main two are:
201 .. autoattribute:: prefix
203 .. autoattribute:: suffix
205 When completions are added to the collector, they are checked against
206 the current prefix to determine if they match the entered text. If they
207 do, the completion system will replace text from ``prefix`` and ``suffix``
208 with the new completion string.
210 The two additional parts are:
212 .. autoattribute:: iprefix
214 .. autoattribute:: isuffix
216 For example, suppose you're completing a second element
217 of a colon-separated list. The list completer will set up
218 the collector so that ``prefix`` and ``suffix`` contain parts of the
219 current list element, while ``iprefix`` and ``isuffix`` contain
220 the rest of the elements:
222 .. code-block:: text
224 list_element_1:list_el|ement_2:list_element_3
225 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘
226 iprefix prefix │ suffix isuffix
227 └ cursor
229 Now, if the completer adds a completion ``"list_elements"``,
230 this text will replace the ``prefix`` and ``suffix``, but not ``iprefix``
231 and ``isuffix``. So, after the completion is applied, the string will
232 look like so:
234 .. code-block:: text
236 list_element_1:list_elements:list_element_3
237 └┬──────────┘
238 this got replaced
240 Finally, there is ``rsuffix``:
242 .. autoattribute:: rsuffix
244 .. autoattribute:: rsymbols
246 So, when completing a colon-separated list, colons will be added and removed
247 automatically, similar to how ZSH does it.
249 .. autoattribute:: dedup_words
251 .. autoattribute:: full_prefix
253 .. autoattribute:: full_suffix
255 .. autoattribute:: text
257 .. autoattribute:: num_completions
259 .. automethod:: add
261 .. automethod:: add_group
263 .. automethod:: save_state
265 .. automethod:: split_off_prefix
267 .. automethod:: split_off_suffix
269 .. automethod:: finalize
271 """
273 iprefix: str
274 """
275 Contains text that goes before the :attr:`~CompletionCollector.prefix`.
277 This prefix is not considered when checking whether a completion
278 matches a text, and it is not replaced by the completion. It will also
279 not be shown in the table of completions.
281 This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix`
282 are moved to :attr:`~CompletionCollector.iprefix` as completers split it into
283 list elements.
285 """
287 prefix: str
288 """
289 Portion of the completed text before the cursor.
291 """
293 suffix: str
294 """
295 Portion of the completed text after the cursor.
297 """
299 rsuffix: str
300 """
301 Starts empty, and may be set to hold a list separator.
303 This suffix will be added after the completion. However, it will be automatically
304 removed if the user types one of :attr:`CompletionCollector.rsymbols`,
305 or moves cursor, or alters input in some other way.
307 This property is mutable and can be changed by completers.
309 """
311 rsymbols: str
312 """
313 If user types one of the symbols from this string,
314 :attr:`~.CompletionCollector.rsuffix` will be removed.
316 This property is mutable and can be changed by completers.
318 """
320 isuffix: str
321 """
322 Similar to :attr:`CompletionCollector.iprefix`, but for suffixes.
324 """
326 dedup_words: frozenset[str]
327 """
328 Completions from this set will not be added. This is useful
329 when completing lists of unique values.
331 This property is mutable and can be changed by completers.
333 """
335 # Internal fields.
336 _group_id: int
337 _group_sorted: bool
338 _group_color_tag: str | None
340 def __init__(self, text: str, pos: int, /):
341 self.iprefix = ""
342 self.prefix = text[:pos]
343 self.suffix = text[pos:]
344 self.rsuffix = ""
345 self.rsymbols = ""
346 self.isuffix = ""
347 self.dedup_words = frozenset()
349 self._group_id = 0
350 self._group_sorted = True
351 self._group_color_tag = None
353 self._completions: list[Completion] = []
355 @property
356 def full_prefix(self) -> str:
357 """
358 Portion of the final completed text that goes before the cursor.
360 """
362 return self.iprefix + self.prefix
364 @property
365 def full_suffix(self) -> str:
366 """
367 Portion of the final completed text that goes after the cursor.
369 """
371 return self.suffix + self.isuffix
373 @property
374 def text(self) -> str:
375 """
376 Portion of the text that is being autocompleted.
378 """
380 return self.prefix + self.suffix
382 @contextlib.contextmanager
383 def save_state(self):
384 """
385 Save current state of the collector, i.e. prefixes,
386 suffixes, etc., upon entering this context manager,
387 then restore state upon exiting.
389 Use this context manager when you need to call nested
390 completers more than once to prevent changes made in
391 one nested completer bleeding out into another
392 nested completer.
394 """
396 state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)}
398 try:
399 yield
400 finally:
401 for name, value in state.items():
402 setattr(self, name, value)
404 def add(
405 self,
406 completion: str,
407 /,
408 *,
409 comment: str | None = None,
410 dprefix: str = "",
411 dsuffix: str = "",
412 color_tag: str | None = None,
413 ):
414 """
415 Add a new completion.
417 :param completion:
418 completed text without :attr:`~CompletionCollector.iprefix`
419 and :attr:`~CompletionCollector.isuffix`. This text will replace
420 :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`.
421 :param comment:
422 additional comment that will be displayed near the completion.
423 :param color_tag:
424 allows overriding color tag from the group.
426 """
428 if (
429 completion
430 and completion not in self.dedup_words
431 and completion.startswith(self.prefix)
432 ):
433 self._add(
434 completion,
435 comment=comment,
436 dprefix=dprefix,
437 dsuffix=dsuffix,
438 color_tag=color_tag,
439 )
441 def _add(
442 self,
443 completion: str,
444 /,
445 *,
446 comment: str | None = None,
447 dprefix: str = "",
448 dsuffix: str = "",
449 color_tag: str | None = None,
450 ):
451 if not self.isuffix or self.isuffix[0] in string.whitespace:
452 # Only add `rsuffix` if we're at the end of an array element.
453 # Don't add `rsuffix` if we're in the middle of an array, unless the array
454 # is separated by spaces.
455 rsuffix = self.rsuffix
456 rsymbols = self.rsymbols
457 else:
458 rsuffix = ""
459 rsymbols = ""
461 if self._group_sorted:
462 group_id = (self._group_id, 0)
463 else:
464 group_id = (self._group_id, len(self._completions))
466 if color_tag is None:
467 color_tag = self._group_color_tag
469 self._completions.append(
470 Completion(
471 iprefix=self.iprefix,
472 completion=completion,
473 rsuffix=rsuffix,
474 rsymbols=rsymbols,
475 isuffix=self.isuffix,
476 comment=comment,
477 dprefix=dprefix,
478 dsuffix=dsuffix,
479 group_id=group_id,
480 group_color_tag=color_tag,
481 )
482 )
484 def add_group(self, /, *, sorted: bool = True, color_tag: str | None = None):
485 """
486 Add a new completions group.
488 All completions added after call to this method will be placed to the new group.
489 They will be grouped together, and colored according to the group's color tag.
491 :param sorted:
492 controls whether completions in the new group
493 should be sorted lexicographically.
494 :param color_tag:
495 which color tag should be used to display completions
496 and their help messages for this group.
498 See :attr:`yuio.widget.Option.color_tag` for details.
500 """
502 self._group_id += 1
503 self._group_sorted = sorted
504 self._group_color_tag = color_tag
506 @property
507 def num_completions(self) -> int:
508 """
509 Number of completions added so far.
511 """
513 return len(self._completions)
515 def split_off_prefix(self, delim: str | None = None, /):
516 """
517 Move everything up to the last occurrence of ``delim``
518 from :attr:`~CompletionCollector.prefix`
519 to :attr:`~CompletionCollector.iprefix`.
521 :param delim:
522 delimiter to split off; :data:`None` value splits off on any whitespace
523 character, similar to :meth:`str.rsplit`.
525 """
527 delim = delim or " "
528 parts = self.prefix.rsplit(delim, maxsplit=1)
529 if len(parts) > 1:
530 self.iprefix += parts[0] + delim
531 self.prefix = parts[1]
533 def split_off_suffix(self, delim: str | None = None, /):
534 """
535 Move everything past the first occurrence of ``delim``
536 from :attr:`~CompletionCollector.suffix`
537 to :attr:`~CompletionCollector.isuffix`.
539 :param delim:
540 delimiter to split off; :data:`None` value splits off on any whitespace
541 character, similar to :meth:`str.split`.
543 """
545 delim = delim or " "
546 parts = self.suffix.split(delim, maxsplit=1)
547 if len(parts) > 1:
548 self.suffix = parts[0]
549 self.isuffix = delim + parts[1] + self.isuffix
551 def finalize(self) -> list[Completion]:
552 """
553 Finish collecting completions and return everything that was collected.
555 Do not reuse a collector after it was finalized.
557 :returns:
558 list of completions, sorted by their groups and preferred ordering
559 within each group.
561 If all completions start with a common prefix, a single completion
562 is returned containing this prefix.
564 """
566 if len(self._completions) > 1:
567 c0 = self._completions[0]
569 iprefix = c0.iprefix
570 isuffix = c0.isuffix
572 if (
573 self.full_prefix.startswith(iprefix)
574 and self.full_suffix.endswith(isuffix)
575 and all(
576 c.iprefix == iprefix and c.isuffix == isuffix
577 for c in self._completions
578 )
579 ):
580 # If all completions have the same `iprefix` and `isuffix`...
581 common_prefix = _commonprefix(
582 list(c.completion for c in self._completions)
583 )
584 if common_prefix and len(iprefix) + len(common_prefix) > len(
585 self.iprefix
586 ) + len(self.prefix):
587 # ...and they have a common prefix that is longer than what's entered so far,
588 # then complete this common prefix.
589 rsuffix = ""
590 rsymbols = ""
591 if all(
592 common_prefix == c.completion and rsuffix == c.rsuffix
593 for c in self._completions
594 ):
595 # If completing common prefix actually fulfills a completion, add `rsuffix` as well.
596 rsuffix = c0.rsuffix
597 rsymbols = c0.rsymbols
598 return [
599 Completion(
600 iprefix=iprefix,
601 completion=common_prefix,
602 rsuffix=rsuffix,
603 rsymbols=rsymbols,
604 isuffix=isuffix,
605 comment=None,
606 dprefix="",
607 dsuffix="",
608 group_id=(0, 0),
609 group_color_tag=None,
610 )
611 ]
613 self._completions.sort()
614 return self._completions
617_MAX_COMPLETION_CORRECTIONS: int = 1
618_MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3
621@_t.final
622class _CorrectingCollector(CompletionCollector):
623 def __init__(self, text: str, pos: int):
624 super().__init__(text, pos)
626 self._has_corrections = False
628 def add(
629 self,
630 completion: str,
631 /,
632 *,
633 comment: str | None = None,
634 dprefix: str = "",
635 dsuffix: str = "",
636 color_tag: str | None = None,
637 ):
638 if not completion or completion in self.dedup_words:
639 return
641 a = self.prefix + self.suffix
642 b = completion
643 corrections = _corrections(a, b)
644 threshold = (
645 _MAX_COMPLETION_CORRECTIONS
646 + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2
647 )
649 if corrections <= 1:
650 # this is a simple mistype, add it as usual
651 self._add(
652 completion,
653 comment=comment,
654 dprefix=dprefix,
655 dsuffix=dsuffix,
656 color_tag=color_tag,
657 )
658 elif corrections <= threshold:
659 # this is a correction, add it into corrections group
660 if comment:
661 comment = "corrected: " + comment
662 else:
663 comment = "corrected"
664 with self.save_state():
665 self._group_id = 0xFFFFFFFE # (big enough) - 1
666 self._group_color_tag = "corrected"
667 self._add(
668 completion,
669 comment=comment,
670 dprefix=dprefix,
671 dsuffix=dsuffix,
672 color_tag=color_tag,
673 )
674 self._has_corrections = True
676 def finalize(self) -> list[Completion]:
677 if self._has_corrections:
678 c0 = self._completions[0]
680 iprefix = ""
681 prefix = self.full_prefix
682 suffix = self.full_suffix
683 isuffix = ""
685 if prefix.startswith(c0.iprefix):
686 l = len(c0.iprefix)
687 iprefix = prefix[:l]
688 prefix = prefix[l:]
690 if suffix.endswith(c0.isuffix):
691 l = len(c0.isuffix)
692 isuffix = suffix[-l:]
693 suffix = suffix[:-l]
695 # If we have corrections, add original value to the end.
696 with self.save_state():
697 self._group_id = 0xFFFFFFFF # (big enough)
698 self._group_color_tag = "original"
699 self.iprefix = iprefix
700 self.isuffix = isuffix
701 self._add(prefix + suffix, comment="original")
703 self._completions.sort()
704 return self._completions
707def _corrections(a: str, b: str) -> float:
708 # Damerau–Levenshtein distance (Optimal String Alignment distance)
710 a = a.casefold()
711 b = b.casefold()
712 d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)]
713 for i in range(len(a) + 1):
714 d[i][0] = i
715 for j in range(len(b) + 1):
716 d[0][j] = j
717 for i in range(1, len(a) + 1):
718 for j in range(1, len(b) + 1):
719 d[i][j] = min(
720 # Add to `a`:
721 d[i - 1][j] + 1,
722 # Add to `b`:
723 d[i][j - 1] + 1,
724 # Replace:
725 d[i - 1][j - 1] + (a[i - 1] != b[j - 1]),
726 # Transpose:
727 (
728 d[i - 2][j - 2] + (a[i - 1] != b[j - 1])
729 if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1]
730 else math.inf
731 ),
732 )
734 return d[-1][-1]
737def _commonprefix(m: list[str]) -> str:
738 if not m:
739 return ""
740 s1 = min(m)
741 s2 = max(m)
742 for i, c in enumerate(s1):
743 if c != s2[i]:
744 return s1[:i]
745 return s1
748class Completer(abc.ABC):
749 """
750 An interface for text completion providers.
752 """
754 def complete(
755 self, text: str, pos: int, /, *, do_corrections: bool = True
756 ) -> list[Completion]:
757 """
758 Complete the given text at the given cursor position.
760 :param text:
761 text that is being completed.
762 :param pos:
763 position of the cursor in the text. ``0`` means the cursor
764 is before the first character, ``len(text)`` means the cursor
765 is after the last character.
766 :param do_corrections:
767 if :data:`True` (default), completion system will try to guess
768 if there are any misspells in the ``text``, and offer to correct them.
769 :returns:
770 a sorted list of completions.
772 If all completions start with a common prefix, a single completion
773 is returned containing this prefix.
775 """
777 collector = CompletionCollector(text, pos)
778 with collector.save_state():
779 self._process(collector)
780 completions = collector.finalize()
781 if completions or not do_corrections:
782 return completions
784 collector = _CorrectingCollector(text, pos)
785 with collector.save_state():
786 self._process(collector)
787 return collector.finalize()
789 @abc.abstractmethod
790 def _process(self, collector: CompletionCollector, /):
791 """
792 Generate completions and add them to the given collector.
794 Implementing this class is straight forward, just feed all possible
795 completions to the collector. For example, let's implement a completer
796 for environment variables:
798 .. code-block:: python
800 class EnvVarCompleter(Completer):
801 def _process(self, collector: CompletionCollector):
802 for var in os.environ.keys():
803 collector.add(var)
805 """
807 raise NotImplementedError()
809 def _get_completion_model(
810 self, *, is_many: bool = False
811 ) -> _CompleterSerializer.Model:
812 """
813 Internal, do not use.
815 """
817 return _CompleterSerializer.CustomCompleter(self)
820class Empty(Completer):
821 """
822 An empty completer that returns no values.
824 """
826 def _process(self, collector: CompletionCollector):
827 pass # nothing to do
829 def _get_completion_model(
830 self, *, is_many: bool = False
831 ) -> _CompleterSerializer.Model:
832 return _CompleterSerializer.Model()
835@dataclass(frozen=True, slots=True)
836class Option:
837 """
838 A single completion option for the :class:`Choice` completer.
840 """
842 completion: str
843 """
844 This string will replace an element that is being completed.
846 """
848 comment: str | None = None
849 """
850 Short comment displayed alongside the completion.
852 """
855class Choice(Completer):
856 """
857 Completes input from a predefined list of completions.
859 :param choices:
860 options to choose completion from.
862 """
864 def __init__(self, choices: _t.Collection[Option], /):
865 self._choices: _t.Collection[Option] = choices
867 def _process(self, collector: CompletionCollector, /):
868 for choice in self._choices:
869 collector.add(choice.completion, comment=choice.comment)
871 def _get_completion_model(
872 self, *, is_many: bool = False
873 ) -> _CompleterSerializer.Model:
874 if any(option.comment for option in self._choices):
875 return _CompleterSerializer.ChoiceWithDesc(
876 [(option.completion, option.comment or "") for option in self._choices]
877 )
878 else:
879 return _CompleterSerializer.Choice(
880 [option.completion for option in self._choices]
881 )
884class Alternative(Completer):
885 """
886 Joins outputs from multiple completers.
888 :param completers:
889 list of inner completers.
891 This is a list of tuples. First tuple element is a description of a completion
892 group. It will be displayed when this completer is used in shells
893 that support it (namely, ZSH). Second tuple element is the inner completer
894 itself.
896 """
898 def __init__(self, completers: list[tuple[str, Completer]], /):
899 self._completers = completers
901 def _process(self, collector: CompletionCollector, /):
902 for _, completer in self._completers:
903 with collector.save_state():
904 collector.add_group()
905 completer._process(collector)
907 def _get_completion_model(
908 self, *, is_many: bool = False
909 ) -> _CompleterSerializer.Model:
910 return _CompleterSerializer.Alternative(
911 [
912 (name, completer._get_completion_model(is_many=is_many))
913 for name, completer in self._completers
914 ]
915 )
918class List(Completer):
919 """
920 Completes a value-separated list of elements.
922 :param inner:
923 completer for list items.
924 :param delimiter:
925 a character that separates list items. :data:`None` separates by any whitespace
926 character, similar to :meth:`str.split`.
927 :param allow_duplicates:
928 whether to show completions that already appear in the list.
930 """
932 def __init__(
933 self,
934 inner: Completer,
935 /,
936 *,
937 delimiter: str | None = None,
938 allow_duplicates: bool = False,
939 ):
940 self._inner = inner
941 if delimiter == "":
942 raise ValueError("empty delimiter")
943 self._delimiter = delimiter
944 self._allow_duplicates = allow_duplicates
946 def _process(self, collector: CompletionCollector, /):
947 collector.split_off_prefix(self._delimiter)
948 collector.split_off_suffix(self._delimiter)
949 collector.rsuffix = self._delimiter or " "
950 collector.rsymbols += self._delimiter or string.whitespace
952 if not self._allow_duplicates:
953 dedup_words = set(
954 collector.iprefix.split(self._delimiter)
955 + collector.isuffix.split(self._delimiter)
956 )
957 if collector.text in dedup_words:
958 dedup_words.remove(collector.text)
959 collector.dedup_words = frozenset(dedup_words)
960 else:
961 collector.dedup_words = frozenset()
963 self._inner._process(collector)
965 def _get_completion_model(
966 self, *, is_many: bool = False
967 ) -> _CompleterSerializer.Model:
968 if is_many:
969 return _CompleterSerializer.ListMany(
970 self._delimiter or " ", self._inner._get_completion_model()
971 )
972 else:
973 return _CompleterSerializer.List(
974 self._delimiter or " ", self._inner._get_completion_model()
975 )
978class Tuple(Completer):
979 """
980 Completes a value-separated tuple of elements.
982 :param inner:
983 completers for each tuple element.
984 :param delimiter:
985 a character that separates list items. :data:`None` separates by any whitespace
986 character, similar to :meth:`str.split`.
988 """
990 def __init__(self, *inner: Completer, delimiter: str | None = None):
991 self._inner = inner
992 if delimiter == "":
993 raise ValueError("empty delimiter")
994 self._delimiter = delimiter
996 def _process(self, collector: CompletionCollector, /):
997 pos = len(collector.prefix.split(self._delimiter))
998 if (
999 pos
1000 and self._delimiter is None
1001 and collector.prefix
1002 and collector.prefix[-1] in string.whitespace
1003 ):
1004 # `.split(None)` will trim whitespaces at the end.
1005 # Make sure we count those towards the current position in the tuple.
1006 pos += 1
1007 if pos > len(self._inner):
1008 return
1009 if pos > 0:
1010 pos -= 1
1012 collector.split_off_prefix(self._delimiter)
1013 collector.split_off_suffix(self._delimiter)
1014 collector.rsuffix = self._delimiter or " "
1015 collector.rsymbols += self._delimiter or string.whitespace
1017 self._inner[pos]._process(collector)
1019 def _get_completion_model(
1020 self, *, is_many: bool = False
1021 ) -> _CompleterSerializer.Model:
1022 if is_many:
1023 return _CompleterSerializer.TupleMany(
1024 self._delimiter or " ",
1025 [inner._get_completion_model() for inner in self._inner],
1026 )
1027 else:
1028 return _CompleterSerializer.Tuple(
1029 self._delimiter or " ",
1030 [inner._get_completion_model() for inner in self._inner],
1031 )
1034class File(Completer):
1035 """
1036 Completes file paths.
1038 :param extensions:
1039 allowed file extensions, should include the leading dot.
1041 """
1043 def __init__(self, extensions: str | _t.Collection[str] | None = None):
1044 if isinstance(extensions, str):
1045 self._extensions = [extensions]
1046 elif extensions is not None:
1047 self._extensions = list(extensions)
1048 else:
1049 self._extensions = None
1051 def _process(self, collector: CompletionCollector, /):
1052 base, name = os.path.split(collector.prefix)
1053 if base and not base.endswith(os.path.sep):
1054 base += os.path.sep
1055 collector.iprefix += base
1056 collector.prefix = name
1057 collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0]
1058 resolved = pathlib.Path(base).expanduser().resolve()
1059 rsuffix = collector.rsuffix
1060 if resolved.is_dir():
1061 if name.startswith("."):
1062 collector.rsuffix = ""
1063 collector.add(os.path.curdir + os.path.sep, color_tag="dir")
1064 collector.add(os.path.pardir + os.path.sep, color_tag="dir")
1065 if name.startswith("~"):
1066 collector.rsuffix = ""
1067 collector.add("~" + os.path.sep, color_tag="dir")
1068 try:
1069 for path in resolved.iterdir():
1070 if path.is_dir():
1071 if path.is_symlink():
1072 color_tag = "symlink"
1073 dsuffix = "@"
1074 else:
1075 color_tag = "dir"
1076 dsuffix = ""
1077 collector.rsuffix = ""
1078 collector.add(
1079 path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix
1080 )
1081 elif self._extensions is None or any(
1082 path.name.endswith(ext) for ext in self._extensions
1083 ):
1084 collector.rsuffix = rsuffix
1085 color_tag = None
1086 dsuffix = ""
1087 if path.is_symlink():
1088 color_tag = "symlink"
1089 dsuffix = "@"
1090 elif path.is_file():
1091 if (os.name != "nt" and os.access(path, os.X_OK)) or (
1092 os.name == "nt" and path.suffix == ".exe"
1093 ):
1094 color_tag = "exec"
1095 dsuffix = "*"
1096 else:
1097 color_tag = "file"
1098 elif path.is_socket():
1099 color_tag = "socket"
1100 dsuffix = "="
1101 elif path.is_fifo():
1102 color_tag = "pipe"
1103 dsuffix = "|"
1104 elif path.is_block_device():
1105 color_tag = "block_device"
1106 dsuffix = "#"
1107 elif path.is_char_device():
1108 color_tag = "char_device"
1109 dsuffix = "%"
1110 collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix)
1111 except PermissionError:
1112 return
1114 def _get_completion_model(
1115 self, *, is_many: bool = False
1116 ) -> _CompleterSerializer.Model:
1117 return _CompleterSerializer.File(
1118 "|".join(extension.lstrip(".") for extension in self._extensions or [])
1119 )
1122class Dir(File):
1123 """
1124 Completes directories.
1126 """
1128 def __init__(self):
1129 super().__init__([])
1131 def _get_completion_model(
1132 self, *, is_many: bool = False
1133 ) -> _CompleterSerializer.Model:
1134 return _CompleterSerializer.Dir()
1137class _CompleterSerializer:
1138 def __init__(
1139 self,
1140 add_help: bool,
1141 add_version: bool,
1142 add_bug_report: bool,
1143 path: str = "",
1144 custom_completers: dict[tuple[str, str], list[Completer]] = {},
1145 ):
1146 self._path = path
1147 self._custom_completers = custom_completers
1148 self._subcommands: dict[str, tuple[_CompleterSerializer, bool, str]] = {}
1149 self._positional = 0
1150 self._flags: list[
1151 tuple[
1152 list[str],
1153 str | None,
1154 str | tuple[str, ...] | None,
1155 int | _t.Literal["-", "+", "*", "?"],
1156 _CompleterSerializer.Model,
1157 ]
1158 ] = []
1159 self._add_help = add_help
1160 if add_help:
1161 self._flags.append(
1162 (
1163 ["-h", "--help"],
1164 "show help message and exit",
1165 None,
1166 "-",
1167 _CompleterSerializer.Model(),
1168 )
1169 )
1170 self._add_version = add_version
1171 if add_version:
1172 self._flags.append(
1173 (
1174 ["-V", "--version"],
1175 "show program version and exit",
1176 None,
1177 "-",
1178 _CompleterSerializer.Model(),
1179 )
1180 )
1181 self._add_bug_report = add_bug_report
1182 if add_bug_report:
1183 self._flags.append(
1184 (
1185 ["--bug-report"],
1186 "show environment data for bug report and exit",
1187 None,
1188 "-",
1189 _CompleterSerializer.Model(),
1190 )
1191 )
1193 def add_argument(self, *args: str, **kwargs):
1194 if self._add_help and "--help" in args:
1195 return
1196 if self._add_version and "--version" in args:
1197 return
1198 if self._add_bug_report and "--bug-report" in args:
1199 return
1201 help = kwargs.get("help") or ""
1203 if help == argparse.SUPPRESS:
1204 return
1206 help = yuio.string.strip_color_tags(help)
1208 if all(not arg.startswith("-") for arg in args):
1209 args = (str(self._positional),)
1210 self._positional += 1
1212 action = kwargs.get("action")
1213 metavar = kwargs.get("metavar") or ""
1214 nargs = kwargs.get(
1215 "nargs",
1216 (
1217 0
1218 if action
1219 in [
1220 "store_const",
1221 "store_true",
1222 "store_false",
1223 "append_const",
1224 "count",
1225 "help",
1226 "version",
1227 ]
1228 else 1
1229 ),
1230 )
1231 if get_parser := getattr(action, "get_parser", None):
1232 parser = get_parser()
1233 completer = parser.completer()
1234 if completer is None:
1235 completer = Empty()
1236 completion_model = completer._get_completion_model(
1237 is_many=parser.supports_parse_many()
1238 )
1239 else:
1240 completion_model = self.Model()
1242 self._args = ";".join(args)
1243 completion_model.collect(self)
1245 self._flags.append((list(args), help, metavar, nargs, completion_model))
1247 def add_mutually_exclusive_group(self, *args, **kwargs):
1248 return self
1250 def add_argument_group(self, *args, **kwargs):
1251 return self
1253 def add_subparsers(self, *args, **kwargs):
1254 return self
1256 def add_parser(
1257 self,
1258 name: str,
1259 *,
1260 aliases: _t.Sequence[str] = (),
1261 help: str,
1262 **kwargs,
1263 ):
1264 if help != argparse.SUPPRESS:
1265 help = yuio.string.strip_color_tags(str(help or ""))
1266 serializer = _CompleterSerializer(
1267 self._add_help,
1268 self._add_version,
1269 self._add_bug_report,
1270 f"{self._path}/{name}",
1271 self._custom_completers,
1272 )
1273 self._subcommands[name] = (serializer, False, help)
1274 for alias in aliases:
1275 self._subcommands[alias] = (serializer, True, help)
1276 return serializer
1278 def register_custom_completer(self, completer: Completer) -> str:
1279 completers = self._custom_completers.setdefault((self._path, self._args), [])
1280 data = json.dumps([self._path, self._args, len(completers)])
1281 completers.append(completer)
1282 return data
1284 def get_custom_completer(self, data: str) -> Completer | None:
1285 try:
1286 path, args, index = json.loads(data)
1287 return self._custom_completers[(path, args)][index]
1288 except (json.JSONDecodeError, IndexError, TypeError, ValueError):
1289 pass
1290 return None
1292 def as_parser(self) -> argparse.ArgumentParser:
1293 # We've implemented all methods that `Config._setup_arg_parser` could call.
1294 return _t.cast(argparse.ArgumentParser, self)
1296 _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ")
1298 def _dump(self, path: str, result: list[str]):
1299 if self._subcommands:
1300 self._flags.append(
1301 (
1302 ["c"],
1303 "subcommand",
1304 "<cmd>",
1305 1,
1306 _CompleterSerializer.ChoiceWithDesc(
1307 [
1308 (name, help)
1309 for name, (_, is_alias, help) in self._subcommands.items()
1310 if not is_alias and help != argparse.SUPPRESS
1311 ]
1312 ),
1313 )
1314 )
1316 for opts, desc, meta, nargs, completer in self._flags:
1317 if not isinstance(meta, tuple):
1318 meta = (meta,)
1319 compspec: list[str] = [
1320 path,
1321 " ".join(opts),
1322 desc or "",
1323 " ".join(
1324 re.sub(
1325 r"[\\ ]",
1326 lambda s: "\\S" if s.group() == " " else "\\L",
1327 str(m),
1328 )
1329 or ""
1330 for m in meta
1331 ),
1332 str(nargs),
1333 *completer.dump(),
1334 ]
1336 result.append(
1337 "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec)
1338 )
1340 for subcommand, (serializer, *_) in self._subcommands.items():
1341 serializer._dump(f"{path}/{subcommand}", result)
1343 def _collect_nested(self, compspec: list[object]):
1344 for item in compspec:
1345 self._collect_nested_item(item)
1347 def _collect_nested_item(self, item: object):
1348 if isinstance(item, _CompleterSerializer.Model):
1349 item.collect(self)
1350 elif isinstance(item, (list, tuple)):
1351 for sub_item in item:
1352 self._collect_nested_item(sub_item)
1354 @staticmethod
1355 def _dump_nested(compspec: list[object]) -> list[str]:
1356 contents = []
1358 for item in compspec:
1359 contents.extend(_CompleterSerializer._dump_nested_item(item))
1361 return contents
1363 @staticmethod
1364 def _dump_nested_item(item: object) -> list[str]:
1365 contents = []
1367 if isinstance(item, _CompleterSerializer.Model):
1368 contents.extend(item.dump())
1369 elif isinstance(item, list):
1370 contents.append(str(len(item)))
1371 for sub_item in item:
1372 contents.extend(_CompleterSerializer._dump_nested_item(sub_item))
1373 elif isinstance(item, tuple):
1374 for sub_item in item:
1375 contents.extend(_CompleterSerializer._dump_nested_item(sub_item))
1376 else:
1377 contents.append(str(item))
1379 return contents
1381 @dataclass
1382 class ModelBase:
1383 tag: typing.ClassVar[str] = "-"
1385 def __init_subclass__(cls, tag: str = "-", **kwargs):
1386 super().__init_subclass__(**kwargs)
1387 cls.tag = tag
1389 @dataclass
1390 class Model(ModelBase):
1391 def collect(self, s: _CompleterSerializer):
1392 compspec = [getattr(self, field.name) for field in dataclasses.fields(self)]
1393 s._collect_nested(compspec)
1395 def dump(self) -> list[str]:
1396 compspec = [getattr(self, field.name) for field in dataclasses.fields(self)]
1397 contents = _CompleterSerializer._dump_nested(compspec)
1398 return [self.tag, str(len(contents)), *contents]
1400 @dataclass
1401 class File(Model, tag="f"):
1402 ext: str
1404 @dataclass
1405 class Dir(Model, tag="d"):
1406 pass
1408 @dataclass
1409 class Choice(Model, tag="c"):
1410 choices: list[str]
1412 def dump(self) -> list[str]:
1413 return [self.tag, str(len(self.choices)), *self.choices]
1415 @dataclass
1416 class ChoiceWithDesc(Model, tag="cd"):
1417 choices: list[tuple[str, str]]
1419 def dump(self) -> list[str]:
1420 return [
1421 self.tag,
1422 str(len(self.choices) * 2),
1423 *[c[0] for c in self.choices],
1424 *[yuio.string.strip_color_tags(c[1]) for c in self.choices],
1425 ]
1427 @dataclass
1428 class Git(Model, tag="g"):
1429 class Mode(enum.Enum):
1430 Branch = "b"
1431 Remote = "r"
1432 Tag = "t"
1433 Head = "h"
1435 modes: set[Mode] = dataclasses.field(
1436 default_factory=lambda: {
1437 _CompleterSerializer.Git.Mode.Branch,
1438 _CompleterSerializer.Git.Mode.Tag,
1439 _CompleterSerializer.Git.Mode.Head,
1440 }
1441 )
1443 def dump(self) -> list[str]:
1444 return [self.tag, "1", "".join(mode.value for mode in self.modes)]
1446 @dataclass
1447 class List(Model, tag="l"):
1448 delim: str
1449 inner: _CompleterSerializer.Model
1451 @dataclass
1452 class ListMany(List, tag="lm"):
1453 pass
1455 @dataclass
1456 class Tuple(Model, tag="t"):
1457 delim: str
1458 inner: list[_CompleterSerializer.Model]
1460 @dataclass
1461 class TupleMany(Tuple, tag="tm"):
1462 pass
1464 @dataclass
1465 class Alternative(Model, tag="a"):
1466 alternatives: list[tuple[str, _CompleterSerializer.Model]]
1468 @dataclass
1469 class CustomCompleter(Model, tag="cc"):
1470 completer: Completer
1471 _data: str | None = None
1473 def collect(self, s: _CompleterSerializer):
1474 self._data = s.register_custom_completer(self.completer)
1476 def dump(self) -> list[str]:
1477 assert self._data is not None
1478 return [
1479 self.tag,
1480 "1",
1481 self._data,
1482 ]
1485def _run_custom_completer(s: _CompleterSerializer, data: str, word: str):
1486 completer = s.get_custom_completer(data)
1487 if completer is None:
1488 return
1489 completions = completer.complete(word, len(word), do_corrections=False)
1490 for completion in completions:
1491 print(
1492 f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}",
1493 file=sys.__stdout__,
1494 )
1497_PROG_ESCAPE = str.maketrans(
1498 string.punctuation + string.whitespace,
1499 "_" * (len(string.punctuation) + len(string.whitespace)),
1500)
1503def _write_completions(
1504 s: _CompleterSerializer, prog: str | None = None, shell: str = "all"
1505):
1506 import yuio.io
1508 true_prog = prog or pathlib.Path(sys.argv[0]).stem
1509 prog = (prog or pathlib.Path(sys.argv[0]).stem).translate(_PROG_ESCAPE)
1511 if pathlib.Path(sys.argv[0]).stem == "__main__":
1512 yuio.io.failure(
1513 "You've invoked this program as a python module, most likely with "
1514 "`python -m <module>`. For completions to work, the program "
1515 "must be invoked as a command in your `$PATH`"
1516 )
1517 sys.exit(1)
1518 if not prog:
1519 yuio.io.failure("Failed to generate completion because program name is empty")
1520 sys.exit(1)
1521 if not re.match(r"^[a-zA-Z0-9_-]+$", prog):
1522 yuio.io.failure(
1523 "Failed to generate completion due to "
1524 "forbidden characters in program name: `%r`",
1525 prog,
1526 )
1527 sys.exit(1)
1529 if shell == "uninstall":
1530 shell = "all"
1531 yuio.io.heading("Uninstalling completions for `%s`", true_prog)
1532 install = False
1533 else:
1534 yuio.io.heading("Generating completions for `%s`", true_prog)
1535 install = True
1537 if not shutil.which(true_prog):
1538 yuio.io.warning(
1539 "Program `%s` is not in your `$PATH`. Completions might not be able "
1540 "to initialize",
1541 true_prog,
1542 )
1544 if os.name == "nt":
1545 data_home = cache_home = config_home = pathlib.Path(
1546 os.environ.get("LOCALAPPDATA") or (pathlib.Path.home() / "AppData/Local")
1547 )
1548 else:
1549 data_home = pathlib.Path(
1550 os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share")
1551 )
1552 cache_home = pathlib.Path(
1553 os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache")
1554 )
1555 config_home = pathlib.Path(
1556 os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config")
1557 )
1559 compdata_path = data_home / f"yuio/{prog}.compdata.tsv"
1561 if install:
1562 task_heading = "Installing completions"
1563 else:
1564 task_heading = "Uninstalling completions"
1566 with yuio.io.Task(task_heading) as t:
1567 if install:
1568 result = []
1569 s._dump("", result)
1570 compdata = "\n".join(result)
1572 os.makedirs(data_home / "yuio", exist_ok=True)
1573 compdata_path.write_text(compdata)
1574 yuio.io.info(
1575 "Wrote <c note>completion data</c> to <c path>%s</c>", compdata_path
1576 )
1577 elif compdata_path.exists():
1578 os.remove(compdata_path)
1579 yuio.io.info("Removed <c path>%s</c>", compdata_path)
1581 if shell in ["all", "bash"]:
1582 t.comment("Bash")
1583 _write_bash_script(
1584 prog,
1585 true_prog,
1586 install,
1587 compdata_path,
1588 data_home,
1589 cache_home,
1590 config_home,
1591 )
1592 if shell in ["all", "zsh"]:
1593 t.comment("Zsh")
1594 _write_zsh_script(
1595 prog,
1596 true_prog,
1597 install,
1598 compdata_path,
1599 data_home,
1600 cache_home,
1601 config_home,
1602 )
1603 if shell in ["all", "fish"]:
1604 t.comment("Fish")
1605 _write_fish_script(
1606 prog,
1607 true_prog,
1608 install,
1609 compdata_path,
1610 data_home,
1611 cache_home,
1612 config_home,
1613 )
1614 if shell in ["all", "pwsh"]:
1615 t.comment("PowerShell")
1616 _write_pwsh_script(
1617 prog,
1618 true_prog,
1619 install,
1620 compdata_path,
1621 data_home,
1622 cache_home,
1623 config_home,
1624 )
1626 yuio.io.success("All done! Please restart your shell for changes to take effect.")
1627 if install:
1628 yuio.io.info("Run `%s --completions uninstall` to undo all changes.", prog)
1631def _write_bash_script(
1632 prog: str,
1633 true_prog: str,
1634 install: bool,
1635 compdata_path: pathlib.Path,
1636 data_home: pathlib.Path,
1637 cache_home: pathlib.Path,
1638 config_home: pathlib.Path,
1639):
1640 import yuio.exec
1641 import yuio.io
1643 if os.name == "nt":
1644 yuio.io.warning(
1645 "Skipped <c note>Bash</c>: completion script doesn't support windows"
1646 )
1647 return
1649 try:
1650 bash_completions_home = yuio.exec.exec(
1651 "bash",
1652 "-lc",
1653 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"',
1654 ).splitlines()[-1]
1655 except (subprocess.CalledProcessError, IndexError):
1656 bash_completions_home = data_home / "bash-completion/completions/"
1657 bash_completions_home = pathlib.Path(bash_completions_home)
1658 script_dest = bash_completions_home / true_prog
1660 if install:
1661 _write_script(script_dest, "complete.bash", prog, true_prog, compdata_path)
1662 yuio.io.info("Wrote <c note>Bash</c> script to <c path>%s</c>", script_dest)
1663 elif script_dest.exists():
1664 os.remove(script_dest)
1665 yuio.io.info("Removed <c path>%s</c>", script_dest)
1668def _write_zsh_script(
1669 prog: str,
1670 true_prog: str,
1671 install: bool,
1672 compdata_path: pathlib.Path,
1673 data_home: pathlib.Path,
1674 cache_home: pathlib.Path,
1675 config_home: pathlib.Path,
1676):
1677 import yuio.exec
1678 import yuio.io
1680 if os.name == "nt":
1681 yuio.io.warning(
1682 "Skipped <c note>Zsh</c>: completion script doesn't support windows"
1683 )
1684 return
1686 needs_cache_cleanup = False
1688 zsh_completions_home = data_home / "zsh/completions"
1689 script_dest = zsh_completions_home / ("_" + true_prog)
1691 if install:
1692 needs_cache_cleanup = True
1693 _write_script(script_dest, "complete.zsh", prog, true_prog, compdata_path)
1694 yuio.io.info("Wrote <c note>Zsh</c> script to <c path>%s</c>", script_dest)
1695 elif script_dest.exists():
1696 needs_cache_cleanup = True
1698 os.remove(script_dest)
1699 yuio.io.info("Removed <c path>%s</c>", script_dest)
1701 try:
1702 fpath = (
1703 yuio.exec.exec(
1704 "zsh",
1705 "-lc",
1706 "echo -n $FPATH",
1707 )
1708 .splitlines()[-1]
1709 .split(":")
1710 )
1711 except (subprocess.CalledProcessError, IndexError):
1712 fpath = []
1714 try:
1715 zhome = yuio.exec.exec(
1716 "zsh",
1717 "-lc",
1718 "echo -n ${ZDOTDIR:-$HOME}",
1719 ).splitlines()[-1]
1720 except (subprocess.CalledProcessError, IndexError):
1721 zhome = pathlib.Path.home()
1723 zhome = pathlib.Path(zhome)
1724 zprofile_path = zhome / ".zprofile"
1725 zprofile_append_text = f"\nfpath=({zsh_completions_home} $fpath)\n"
1727 if install:
1728 if str(zsh_completions_home) not in fpath:
1729 with open(zprofile_path, "a") as f:
1730 f.write(zprofile_append_text)
1731 yuio.io.info(
1732 "<c note>Note:</c> modified <c path>%s</c> to add <c path>%s</c> to `fpath`",
1733 zprofile_path,
1734 zsh_completions_home,
1735 )
1736 elif zprofile_path.exists():
1737 zprofile_text = zprofile_path.read_text()
1738 if zprofile_append_text in zprofile_text:
1739 yuio.io.info(
1740 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1741 " because other completions might rely on them",
1742 zprofile_path,
1743 )
1745 if not needs_cache_cleanup:
1746 return
1748 # Try to remove completions cache from the most common places.
1749 for zcomp_basedir in [zhome, cache_home / "prezto"]:
1750 if not zcomp_basedir.exists() or not zcomp_basedir.is_dir():
1751 continue
1752 for file in zcomp_basedir.iterdir():
1753 if file.is_file() and re.match(r"^\.?zcompdump", file.name):
1754 os.remove(file)
1755 yuio.io.info(
1756 "<c note>Note:</c> deleted Zsh completions cache at <c path>%s</c>",
1757 file,
1758 )
1760 try:
1761 # Run zsh with the right flags in case zshrc runs compinit.
1762 # If after generating completions user runs `zsh` without the `-l` flag,
1763 # our changes to fpath will not be visible, and compinit will dump
1764 # an invalid version of cache. To avoid this, we call zsh ourselves
1765 # before the user has a chance to do it. Notice, though, that we don't
1766 # run `compdump`. This is because we can't be sure that the user uses
1767 # the default cache path (~/.zcompdump).
1768 yuio.exec.exec("zsh", "-lc", "true")
1769 except subprocess.CalledProcessError:
1770 pass
1773def _write_fish_script(
1774 prog: str,
1775 true_prog: str,
1776 install: bool,
1777 compdata_path: pathlib.Path,
1778 data_home: pathlib.Path,
1779 cache_home: pathlib.Path,
1780 config_home: pathlib.Path,
1781):
1782 import yuio.io
1784 if os.name == "nt":
1785 yuio.io.warning(
1786 "Skipped <c note>Fish</c>: completion script doesn't support windows"
1787 )
1788 return
1790 fish_completions_home = data_home / "fish/vendor_completions.d"
1791 script_dest = fish_completions_home / (true_prog + ".fish")
1793 if install:
1794 _write_script(script_dest, "complete.fish", prog, true_prog, compdata_path)
1795 yuio.io.info("Wrote <c note>Fish</c> script to <c path>%s</c>", script_dest)
1796 elif script_dest.exists():
1797 os.remove(script_dest)
1798 yuio.io.info("Removed <c path>%s</c>", script_dest)
1801def _write_pwsh_script(
1802 prog: str,
1803 true_prog: str,
1804 install: bool,
1805 compdata_path: pathlib.Path,
1806 data_home: pathlib.Path,
1807 cache_home: pathlib.Path,
1808 config_home: pathlib.Path,
1809):
1810 import yuio.exec
1811 import yuio.io
1813 if shutil.which("pwsh"):
1814 command = "pwsh"
1815 elif shutil.which("powershell"):
1816 command = "powershell"
1817 else:
1818 yuio.io.warning(
1819 "Skipped <c note>PowerShell</c>: `pwsh` command is not available"
1820 )
1821 return
1823 try:
1824 pwsh_data = (
1825 yuio.exec.exec(
1826 command,
1827 "-Command",
1828 'Write-Host "$($PSVersionTable.PSVersion);$PROFILE"',
1829 )
1830 .splitlines()[-1]
1831 .strip()
1832 )
1833 except (subprocess.CalledProcessError, IndexError) as e:
1834 yuio.io.warning(
1835 "Skipped <c note>PowerShell</c>: failed to get powershell `$PROFILE` path: %s",
1836 e,
1837 )
1838 return
1839 except FileNotFoundError:
1840 yuio.io.warning("Skipped <c note>PowerShell</c>: `pwsh` command not found")
1841 return
1842 if match := re.match(r"^(\d+(?:\.\d+)*);(.*)$", pwsh_data):
1843 version = match.group(1)
1844 profile_s = match.group(2)
1845 else:
1846 yuio.io.warning(
1847 "Skipped <c note>PowerShell</c>: can't determine powershell version"
1848 )
1849 return
1850 if not profile_s:
1851 yuio.io.warning(
1852 "Skipped <c note>PowerShell</c>: powershell `$PROFILE` path is empty"
1853 )
1854 return
1855 if tuple(int(v) for v in version.split(".")) < (5, 0, 0):
1856 yuio.io.warning(
1857 "Skipped <c note>PowerShell</c>: completions script requires "
1858 "PowerShell 5 or newer, you have %s",
1859 version,
1860 )
1861 return
1863 profile_path = pathlib.Path(profile_s).expanduser().resolve()
1864 profile_path.parent.mkdir(exist_ok=True, parents=True)
1866 data_dir = data_home / "yuio/pwsh"
1867 loader_path = data_dir / "LoadCompletions.ps1"
1868 script_dest = data_dir / f"_{true_prog}.ps1"
1869 if install:
1870 _write_script(script_dest, "complete.ps1", prog, true_prog, compdata_path)
1871 yuio.io.info(
1872 "Wrote <c note>PowerShell</c> script to <c path>%s</c>", script_dest
1873 )
1874 _write_pwsh_loader(loader_path, data_dir)
1875 elif script_dest.exists():
1876 os.remove(script_dest)
1877 yuio.io.info("Removed <c path>%s</c>", script_dest)
1879 try:
1880 data_dirs = [
1881 pathlib.Path(f).expanduser().resolve()
1882 for f in yuio.exec.exec(
1883 command,
1884 "-Command",
1885 'Write-Host ($_YUIO_COMPL_V1_INIT_PATHS -join "`n")',
1886 )
1887 .strip()
1888 .splitlines()
1889 ]
1890 except (subprocess.CalledProcessError, FileNotFoundError):
1891 return
1893 pwsh_profile_append_text = f"\n. {loader_path}\n"
1895 if install:
1896 if data_dir not in data_dirs:
1897 with open(profile_path, "a") as f:
1898 f.write(pwsh_profile_append_text)
1899 yuio.io.info(
1900 "<c note>Note:</c> modified <c path>%s</c> to call <c path>%s</c> on startup",
1901 profile_path,
1902 loader_path,
1903 )
1904 elif profile_path.exists():
1905 pwsh_profile_text = profile_path.read_text()
1906 if pwsh_profile_append_text in pwsh_profile_text:
1907 yuio.io.info(
1908 "<c note>Note:</c> modifications to <c path>%s</c> are not removed"
1909 " because other completions might rely on them",
1910 profile_path,
1911 )
1914def _write_script(
1915 path: pathlib.Path,
1916 script_name: str,
1917 prog: str,
1918 true_prog: str,
1919 compdata_path: pathlib.Path,
1920):
1921 script_template = _read_script(script_name)
1922 script = (
1923 (script_template)
1924 .replace("@prog@", prog)
1925 .replace("@true_prog@", true_prog)
1926 .replace("@data@", str(compdata_path))
1927 .replace("@version@", yuio.__version__)
1928 )
1930 path.parent.mkdir(exist_ok=True, parents=True)
1931 path.write_text(script)
1934def _read_script(script_name: str):
1935 import zipfile
1936 import zipimport
1938 if isinstance(__loader__, zipimport.zipimporter):
1939 # Yuio is imported directly from a wheel.
1940 with zipfile.ZipFile(__loader__.archive) as archive:
1941 script_template = archive.read("yuio/_complete/" + script_name)
1942 return script_template.decode()
1943 else:
1944 script_template_path = pathlib.Path(__file__).parent / "_complete" / script_name
1945 return script_template_path.read_text()
1948def _write_pwsh_loader(loader_path: pathlib.Path, data_dir: pathlib.Path):
1949 import yuio.io
1951 loader_template_path = pathlib.Path(__file__).parent / "_complete/complete_init.ps1"
1952 loader_template = loader_template_path.read_text()
1954 loader_version = re.search(
1955 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", loader_template, re.MULTILINE
1956 )
1957 assert loader_version
1959 if loader_path.exists() and loader_path.is_file():
1960 current_loader = loader_path.read_text()
1961 current_version_s = re.search(
1962 r"^\s*#\s*LOADER_VERSION:\s*(\d+)\s*$", current_loader, re.MULTILINE
1963 )
1965 if current_version_s is None:
1966 yuio.io.warning(
1967 "<c note>Note:</c> can't determine version of <c path>%s</c>, "
1968 "file will be overridden",
1969 loader_path,
1970 )
1971 elif int(loader_version.group(1)) <= int(current_version_s.group(1)):
1972 return
1974 loader_template = loader_template.replace("@data@", str(data_dir))
1975 loader_path.write_text(loader_template)
1976 yuio.io.info("Wrote <c note>PowerShell</c> script to <c path>%s</c>", loader_path)