Source code for yuio.complete

# Yuio project, MIT license.
#
# https://github.com/taminomara/yuio/
#
# You're free to copy this file to your project and edit it for your needs,
# just keep this copyright line please :3

"""
This module provides autocompletion functionality for widgets and CLI.


Completer basics
----------------

All completers are derived from the :class:`Completer` base class
with a simple interface:

.. autoclass:: Completer
   :members:

.. autoclass:: Completion
   :members:


Completers
----------

Yuio provides basic completers that cover most of the cases:

.. autoclass:: Empty

.. autoclass:: Choice

.. autoclass:: Option

.. autoclass:: List

.. autoclass:: Tuple

.. autoclass:: File

.. autoclass:: Dir


Implementing your own completer
-------------------------------

To implement a custom completer, subclass :class:`Completer` and implement
its :meth:`~Completer._process` method.

.. note::

   When using a custom completer for CLI flags in :mod:`yuio.app`,
   yuio will :mod:`pickle` it, and integrate it into a shell script
   that provides command line completions.

.. class:: Completer
   :noindex:

   .. automethod:: _process

The core of the completion system, however, is a :class:`CompletionCollector`.
This is the class that is responsible for generating a final list of completions:

.. autoclass:: CompletionCollector

"""

import abc
import argparse
import base64
import contextlib
import dataclasses
import enum
import functools
import math
import os
import pathlib
import pickle
import re
import string
import subprocess
import sys
from dataclasses import dataclass

import yuio
from yuio import _t


[docs]@dataclass(frozen=True, **yuio._with_slots()) @functools.total_ordering class Completion: """A single completion.""" #: See :class:`CompletionCollector.iprefix` for details. iprefix: str #: Text of the completion. completion: str #: See :class:`CompletionCollector.rsuffix` for details. rsuffix: str #: See :class:`CompletionCollector.rsymbols` for details. rsymbols: str #: See :class:`CompletionCollector.isuffix` for details. isuffix: str #: Short comment displayed alongside the completion. comment: _t.Optional[str] #: Prefix that will be displayed before :attr:`~Completion.completion` #: when listing completions, but will not be inserted once completion #: is applied. dprefix: str #: Like :attr:`~Completion.dprefix`, but it's a suffix. dsuffix: str #: Group id, used to sort completions. #: #: Actual content of this property is an implementation detail. group_id: yuio.SupportsLt[_t.Any] = dataclasses.field(repr=False) #: Color tag that's used when displaying this completion. #: #: See :meth:`CompletionCollector.add_group` for details. group_color_tag: _t.Optional[str] def __lt__(self, other: "Completion") -> bool: """Completions are ordered by their groups and then alphabetically.""" return self.group_id < other.group_id or ( self.group_id == other.group_id and self.completion < other.completion )
[docs]@dataclass( init=False, eq=False, repr=False, **({} if sys.version_info < (3, 10, 0) else {"match_args": False}), ) class CompletionCollector: """A class that collects completions as completers are running. The text that is being completed is split into four parts, similar to what you might see in ZSH completion widgets. The main two are: .. autoattribute:: prefix .. autoattribute:: suffix When completions are added to the collector, they are checked against the current prefix to determine if they match the entered text. If they do, the completion system will replace text from `prefix` and `suffix` with the new completion string. The two additional parts are: .. autoattribute:: iprefix .. autoattribute:: isuffix For example, suppose you're completing a second element of a colon-separated list. The list completer will set up the collector so that `prefix` and `suffix` contain parts of the current list element, while `iprefix` and `isuffix` contain the rest of the elements: .. code-block:: text list_element_1:list_el|ement_2:list_element_3 └┬────────────┘└┬────┘│└┬────┘└┬────────────┘ iprefix prefix │ suffix isuffix └ cursor Now, if the completer adds a completion ``'list_elements'``, this text will replace the `prefix` and `suffix`, but not `iprefix` and `isuffix`. So, after the completion is applied, the string will look like so: .. code-block:: text list_element_1:list_elements:list_element_3 └┬──────────┘ this got replaced Finally, there is `rsuffix`: .. autoattribute:: rsuffix .. autoattribute:: rsymbols So, when completing a colon-separated list, colons will be added and removed automatically, similar to how ZSH does it. .. autoattribute:: dedup_words .. autoattribute:: full_prefix .. autoattribute:: full_suffix .. autoattribute:: text .. autoattribute:: num_completions .. automethod:: add .. automethod:: add_group .. automethod:: save_state .. automethod:: split_off_prefix .. automethod:: split_off_suffix .. automethod:: finalize """ #: Contains text that goes before the :attr:`~CompletionCollector.prefix`. #: #: This prefix is not considered when checking whether a completion #: matches a text, and it is not replaced by the completion. It will also #: not be shown in the table of completions. #: #: This prefix starts empty, and then parts of :attr:`~CompletionCollector.prefix` #: are moved to :attr:`~CompletionCollector.iprefix` as completers split it into #: list elements. iprefix: str #: Portion of the completed text before the cursor. prefix: str #: Portion of the completed text after the cursor. suffix: str #: Starts empty, and may be set to hold a list separator. #: #: This suffix will be added after the completion. However, it will be automatically #: removed if the user types one of :attr:`CompletionCollector.rsymbols`, #: or moves cursor, or alters input in some other way. rsuffix: str #: If user types one of the symbols from this string, # :attr:`~.CompletionCollector.rsuffix` will be removed. rsymbols: str #: Similar to :attr:`CompletionCollector.iprefix`, but for suffixes. isuffix: str #: Completions from this set will not be added. This is useful #: when completing lists of unique values. dedup_words: _t.FrozenSet[str] # Internal fields. _group_id: int _group_sorted: bool _group_color_tag: _t.Optional[str] def __init__(self, text: str, pos: int, /): self.iprefix = "" self.prefix = text[:pos] self.suffix = text[pos:] self.rsuffix = "" self.rsymbols = "" self.isuffix = "" self.dedup_words = frozenset() self._group_id = 0 self._group_sorted = True self._group_color_tag = None self._completions: _t.List[Completion] = [] @property def full_prefix(self) -> str: """Portion of the final completed text that goes before the cursor.""" return self.iprefix + self.prefix @property def full_suffix(self) -> str: """Portion of the final completed text that goes after the cursor.""" return self.suffix + self.isuffix @property def text(self) -> str: """Portion of the text that is being autocompleted.""" return self.prefix + self.suffix
[docs] @contextlib.contextmanager def save_state(self): """Save current state of the collector, i.e. prefixes, suffixes, etc., upon entering this context manager, then restore state upon exiting. Use this context manager when you need to call nested completers more than once to prevent changes made in one nested completer bleeding out into another nested completer. """ state = {f.name: getattr(self, f.name) for f in dataclasses.fields(self)} try: yield finally: for name, value in state.items(): setattr(self, name, value)
[docs] def add( self, completion: str, /, *, comment: _t.Optional[str] = None, dprefix: str = "", dsuffix: str = "", color_tag: _t.Optional[str] = None, ): """Add a new completion. :param completion: completed text without :attr:`~CompletionCollector.iprefix` and :attr:`~CompletionCollector.isuffix`. This text will replace :attr:`~CompletionCollector.prefix` and :attr:`~CompletionCollector.suffix`. :param comment: additional comment that will be displayed near the completion. :param color_tag: allows overriding color tag from the group. """ if ( completion and completion not in self.dedup_words and completion.startswith(self.prefix) ): self._add( completion, comment=comment, dprefix=dprefix, dsuffix=dsuffix, color_tag=color_tag, )
def _add( self, completion: str, /, *, comment: _t.Optional[str] = None, dprefix: str = "", dsuffix: str = "", color_tag: _t.Optional[str] = None, ): if not self.isuffix or self.isuffix[0] in string.whitespace: # Only add `rsuffix` if we're at the end of an array element. # Don't add `rsuffix` if we're in the middle of an array, unless the array # is separated by spaces. rsuffix = self.rsuffix rsymbols = self.rsymbols else: rsuffix = "" rsymbols = "" if self._group_sorted: group_id = (self._group_id, 0) else: group_id = (self._group_id, len(self._completions)) if color_tag is None: color_tag = self._group_color_tag self._completions.append( Completion( iprefix=self.iprefix, completion=completion, rsuffix=rsuffix, rsymbols=rsymbols, isuffix=self.isuffix, comment=comment, dprefix=dprefix, dsuffix=dsuffix, group_id=group_id, group_color_tag=color_tag, ) )
[docs] def add_group(self, /, *, sorted: bool = True, color_tag: _t.Optional[str] = None): """Add a new completions group. All completions added after call to this method will be placed to the new group. They will be grouped together, and colored according to the group's color tag. :param sorted: controls whether completions in the new group should be sorted. :param color_tag: which color tag should be used to display completions and their help messages for this group. See :attr:`yuio.widget.Option.color_tag` for details. """ self._group_id += 1 self._group_sorted = sorted self._group_color_tag = color_tag
@property def num_completions(self) -> int: """Return number of completions that were added so far.""" return len(self._completions)
[docs] def split_off_prefix(self, delim: _t.Optional[str] = None, /): """Move everything up to the last occurrence of `delim` from :attr:`~CompletionCollector.prefix` to :attr:`~CompletionCollector.iprefix`. """ delim = delim or " " parts = self.prefix.rsplit(delim, maxsplit=1) if len(parts) > 1: self.iprefix += parts[0] + delim self.prefix = parts[1]
[docs] def split_off_suffix(self, delim: _t.Optional[str] = None, /): """Move everything past the first occurrence of `delim` from :attr:`~CompletionCollector.suffix` to :attr:`~CompletionCollector.isuffix`. """ delim = delim or " " parts = self.suffix.split(delim, maxsplit=1) if len(parts) > 1: self.suffix = parts[0] self.isuffix = delim + parts[1] + self.isuffix
[docs] def finalize(self) -> _t.List[Completion]: """Finish collecting completions and return everything that was collected. Do not reuse a collector after it was finalized. """ if len(self._completions) > 1: c0 = self._completions[0] iprefix = c0.iprefix isuffix = c0.isuffix if ( self.full_prefix.startswith(iprefix) and self.full_suffix.endswith(isuffix) and all( c.iprefix == iprefix and c.isuffix == isuffix for c in self._completions ) ): # If all completions have the same `iprefix` and `isuffix`... common_prefix = yuio._commonprefix( list(c.completion for c in self._completions) ) if common_prefix and len(iprefix) + len(common_prefix) > len( self.iprefix ) + len(self.prefix): # ...and they have a common prefix that is longer than what's entered so far, # then complete this common prefix. rsuffix = "" rsymbols = "" if all( common_prefix == c.completion and rsuffix == c.rsuffix for c in self._completions ): # If completing common prefix actually fulfills a completion, add `rsuffix` as well. rsuffix = c0.rsuffix rsymbols = c0.rsymbols return [ Completion( iprefix=iprefix, completion=common_prefix, rsuffix=rsuffix, rsymbols=rsymbols, isuffix=isuffix, comment=None, dprefix="", dsuffix="", group_id=(0, 0), group_color_tag=None, ) ] self._completions.sort() return self._completions
_MAX_COMPLETION_CORRECTIONS: int = 1 _MAX_COMPLETION_CORRECTIONS_RATE: float = 1 / 3 @_t.final class _CorrectingCollector(CompletionCollector): def __init__(self, text: str, pos: int): super().__init__(text, pos) self._has_corrections = False def add( self, completion: str, /, *, comment: _t.Optional[str] = None, dprefix: str = "", dsuffix: str = "", color_tag: _t.Optional[str] = None, ): if not completion or completion in self.dedup_words: return a = self.prefix + self.suffix b = completion corrections = _corrections(a, b) threshold = ( _MAX_COMPLETION_CORRECTIONS + _MAX_COMPLETION_CORRECTIONS_RATE * (len(a) + len(b)) / 2 ) if corrections <= 1: # this is a simple mistype, add it as usual self._add( completion, comment=comment, dprefix=dprefix, dsuffix=dsuffix, color_tag=color_tag, ) elif corrections <= threshold: # this is a correction, add it into corrections group if comment: comment = "corrected: " + comment else: comment = "corrected" with self.save_state(): self._group_id = 0xFFFFFFFE # (big enough) - 1 self._group_color_tag = "corrected" self._add( completion, comment=comment, dprefix=dprefix, dsuffix=dsuffix, color_tag=color_tag, ) self._has_corrections = True def finalize(self) -> _t.List[Completion]: if self._has_corrections: c0 = self._completions[0] iprefix = "" prefix = self.full_prefix suffix = self.full_suffix isuffix = "" if prefix.startswith(c0.iprefix): l = len(c0.iprefix) iprefix = prefix[:l] prefix = prefix[l:] if suffix.endswith(c0.isuffix): l = len(c0.isuffix) isuffix = suffix[-l:] suffix = suffix[:-l] # If we have corrections, add original value to the end. with self.save_state(): self._group_id = 0xFFFFFFFF # (big enough) self._group_color_tag = "original" self.iprefix = iprefix self.isuffix = isuffix self._add(prefix + suffix, comment="original") self._completions.sort() return self._completions def _corrections(a: str, b: str) -> float: # Damerau–Levenshtein distance (Optimal String Alignment distance) a = a.casefold() b = b.casefold() d = [x[:] for x in [[0.0] * (len(b) + 1)] * (len(a) + 1)] for i in range(len(a) + 1): d[i][0] = i for j in range(len(b) + 1): d[0][j] = j for i in range(1, len(a) + 1): for j in range(1, len(b) + 1): d[i][j] = min( # Add to `a`: d[i - 1][j] + 1, # Add to `b`: d[i][j - 1] + 1, # Replace: d[i - 1][j - 1] + (a[i - 1] != b[j - 1]), # Transpose: d[i - 2][j - 2] + (a[i - 1] != b[j - 1]) if i > 2 and j > 2 and a[i - 2 : i] == b[j - 1 : j - 3 : -1] else math.inf, ) return d[-1][-1]
[docs]class Completer(abc.ABC): """An interface for text completion providers."""
[docs] def complete( self, text: str, pos: int, /, *, do_corrections: bool = True ) -> _t.List[Completion]: """ Complete the given text at the given cursor position. :param text: text that is being completed. :param pos: position of the cursor in the text. ``0`` means the cursor is before the first character, ``len(text)`` means the cursor is after the last character. :param do_corrections: if :data:`True` (default), completion system will try to guess if there are any misspells in the ``text``, and offer to correct them. """ collector = CompletionCollector(text, pos) with collector.save_state(): self._process(collector) completions = collector.finalize() if completions or not do_corrections: return completions collector = _CorrectingCollector(text, pos) with collector.save_state(): self._process(collector) return collector.finalize()
[docs] @abc.abstractmethod def _process(self, collector: CompletionCollector, /): """Generate completions and add them to the given collector. Implementing this class is straight forward, just feed all possible completions to the collector. For example, let's implement a completer for environment variables:: class EnvVarCompleter(Completer): def _process(self, collector: CompletionCollector): for var in os.environ.keys(): collector.add(var) """
def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": """Internal, do not use.""" return _CompleterSerializer.CustomCompleter(self)
[docs]class Empty(Completer): """An empty completer that returns no values.""" def _process(self, collector: CompletionCollector): pass # nothing to do def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": return _CompleterSerializer.Model()
[docs]@dataclass(frozen=True, **yuio._with_slots()) class Option: """A single completion option for the :class:`Choice` completer.""" #: This string will replace an element that is being completed. completion: str #: Short comment displayed alongside the completion. comment: _t.Optional[str] = None
[docs]class Choice(Completer): """Completes input from a predefined list of completions.""" def __init__(self, choices: _t.Collection[Option], /): self._choices: _t.Collection[Option] = choices def _process(self, collector: CompletionCollector, /): for choice in self._choices: collector.add(choice.completion, comment=choice.comment) def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": return _CompleterSerializer.Choice( [option.completion for option in self._choices] )
[docs]class List(Completer): """Completes a value-separated list of elements.""" def __init__( self, inner: Completer, /, *, delimiter: _t.Optional[str] = None, allow_duplicates: bool = False, ): self._inner = inner if delimiter == "": raise ValueError("empty delimiter") self._delimiter = delimiter self._allow_duplicates = allow_duplicates def _process(self, collector: CompletionCollector, /): collector.split_off_prefix(self._delimiter) collector.split_off_suffix(self._delimiter) collector.rsuffix = self._delimiter or " " collector.rsymbols += self._delimiter or string.whitespace if not self._allow_duplicates: dedup_words = set( collector.iprefix.split(self._delimiter) + collector.isuffix.split(self._delimiter) ) if collector.text in dedup_words: dedup_words.remove(collector.text) collector.dedup_words = frozenset(dedup_words) else: collector.dedup_words = frozenset() self._inner._process(collector) def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": if is_many: return _CompleterSerializer.ListMany( self._delimiter or " ", self._inner._get_completion_model() ) else: return _CompleterSerializer.List( self._delimiter or " ", self._inner._get_completion_model() )
[docs]class Tuple(Completer): """Completes a value-separated tuple of elements.""" def __init__(self, *inners: Completer, delimiter: _t.Optional[str] = None): self._inners = inners if delimiter == "": raise ValueError("empty delimiter") self._delimiter = delimiter def _process(self, collector: CompletionCollector, /): pos = len(collector.prefix.split(self._delimiter)) if ( pos and self._delimiter is None and collector.prefix and collector.prefix[-1] in string.whitespace ): # `.split(None)` will trim whitespaces at the end. # Make sure we count those towards the current position in the tuple. pos += 1 if pos > len(self._inners): pos = len(self._inners) if pos > 0: pos -= 1 collector.split_off_prefix(self._delimiter) collector.split_off_suffix(self._delimiter) collector.rsuffix = self._delimiter or " " collector.rsymbols += self._delimiter or string.whitespace self._inners[pos]._process(collector) def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": if is_many: return _CompleterSerializer.TupleMany( self._delimiter or " ", [inner._get_completion_model() for inner in self._inners], ) else: return _CompleterSerializer.Tuple( self._delimiter or " ", [inner._get_completion_model() for inner in self._inners], )
[docs]class File(Completer): def __init__(self, extensions: _t.Union[str, _t.Collection[str], None] = None): if isinstance(extensions, str): self._extensions = [extensions] elif extensions is not None: self._extensions = list(extensions) else: self._extensions = None def _process(self, collector: CompletionCollector, /): base, name = os.path.split(collector.prefix) if base and not base.endswith(os.path.sep): base += os.path.sep collector.iprefix += base collector.prefix = name collector.suffix = collector.suffix.split(os.sep, maxsplit=1)[0] resolved = pathlib.Path(base).resolve() rsuffix = collector.rsuffix if resolved.is_dir(): if name.startswith("."): collector.rsuffix = "" collector.add("./", color_tag="dir") collector.add("../", color_tag="dir") try: for path in resolved.iterdir(): if path.is_dir(): if path.is_symlink(): color_tag = "symlink" dsuffix = "@" else: color_tag = "dir" dsuffix = "" collector.rsuffix = "" collector.add( path.name + os.sep, color_tag=color_tag, dsuffix=dsuffix ) elif self._extensions is None or any( path.name.endswith(ext) for ext in self._extensions ): collector.rsuffix = rsuffix color_tag = None dsuffix = "" if path.is_file(): if os.access(path, os.X_OK): color_tag = "exec" dsuffix = "*" else: color_tag = "file" elif path.is_symlink(): color_tag = "symlink" dsuffix = "@" elif path.is_socket(): color_tag = "socket" dsuffix = "=" elif path.is_fifo(): color_tag = "pipe" dsuffix = "|" elif path.is_block_device(): color_tag = "block_device" dsuffix = "#" elif path.is_char_device(): color_tag = "char_device" dsuffix = "%" collector.add(path.name, color_tag=color_tag, dsuffix=dsuffix) except PermissionError: return def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": return _CompleterSerializer.File( "|".join(extension.lstrip(".") for extension in self._extensions or []) )
[docs]class Dir(File): def __init__(self): super().__init__([]) def _get_completion_model( self, *, is_many: bool = False ) -> "_CompleterSerializer.Model": return _CompleterSerializer.Dir()
class _CompleterSerializer: def __init__(self): self._subcommands: _t.Dict[ str, _t.Tuple["_CompleterSerializer", bool, str] ] = {} self._positional = 0 self._flags: _t.List[ _t.Tuple[ _t.List[str], _t.Optional[str], _t.Union[str, _t.Tuple[str, ...], None], _t.Union[int, _t.Literal["-", "+", "*", "?"]], "_CompleterSerializer.Model", ] ] = [ ( ["-h", "--help"], "show help and exit", None, "-", _CompleterSerializer.Model(), ) ] def add_argument(self, *args: str, **kwargs): help = kwargs.get("help") or "" if help is argparse.SUPPRESS: return if all(not arg.startswith("-") for arg in args): args = (str(self._positional),) self._positional += 1 action = kwargs.get("action") metavar = kwargs.get("metavar") or "" nargs = kwargs.get( "nargs", 0 if action in [ "store_const", "store_true", "store_false", "append_const", "count", "help", "version", ] else 1, ) if get_parser := getattr(action, "get_parser", None): parser = get_parser() completer = None if get_completer := getattr(action, "get_completer", None): completer = get_completer() if completer is None: completer = parser.completer() completion_model = completer._get_completion_model( is_many=parser.supports_parse_many() ) else: completion_model = self.Model() self._flags.append((list(args), help, metavar, nargs, completion_model)) def add_mutually_exclusive_group(self, *args, **kwargs): return self def add_argument_group(self, *args, **kwargs): return self def add_subparsers(self, *args, **kwargs): return self def add_parser( self, name: str, *, aliases: _t.Sequence[str] = (), help: str, **kwargs, ): serializer = _CompleterSerializer() self._subcommands[name] = (serializer, False, str(help or "")) for alias in aliases: self._subcommands[alias] = (serializer, True, str(help)) return serializer def as_parser(self) -> argparse.ArgumentParser: # We've implemented all methods that `Config._setup_arg_parser` could call. return _t.cast(argparse.ArgumentParser, self) def write_completions(self, prog: _t.Optional[str] = None, shell: str = "all"): import yuio.io prog = prog or pathlib.Path(sys.argv[0]).name yuio.io.heading("Generating completions for `%s`", prog) result = [] self._dump("", result) compdata = "\n".join(result) data_home = pathlib.Path( os.environ.get("XDG_DATA_HOME") or (pathlib.Path.home() / ".local/share") ) cache_home = pathlib.Path( os.environ.get("XDG_CACHE_HOME") or (pathlib.Path.home() / ".cache") ) config_home = pathlib.Path( os.environ.get("XDG_CONFIG_HOME") or (pathlib.Path.home() / ".config") ) os.makedirs(data_home / "yuio", exist_ok=True) compdata_path = data_home / f"yuio/{prog}.compdata.tsv" compdata_path.write_text(compdata) yuio.io.info("Wrote completion data to `%s`", compdata_path) if shell in ["all", "bash"]: self._write_bash_script( prog, compdata_path, data_home, cache_home, config_home ) if shell in ["all", "zsh"]: self._write_zsh_script( prog, compdata_path, data_home, cache_home, config_home ) if shell in ["all", "fish"]: self._write_fish_script( prog, compdata_path, data_home, cache_home, config_home ) yuio.io.success( "All done! Please restart your shell for changes to take effect." ) def _write_bash_script( self, prog: str, compdata_path: pathlib.Path, data_home: pathlib.Path, cache_home: pathlib.Path, config_home: pathlib.Path, ): import yuio.exec import yuio.io try: bash_completions_home = yuio.exec.exec( "bash", "-lic", 'echo -n "${BASH_COMPLETION_USER_DIR:-${XDG_DATA_HOME:-$HOME/.local/share}/bash-completion}/completions/"', ).splitlines()[-1] except subprocess.CalledProcessError: bash_completions_home = data_home / "bash-completion/completions/" bash_completions_home = pathlib.Path(bash_completions_home) os.makedirs(bash_completions_home, exist_ok=True) script_dest = bash_completions_home / prog script_template = (pathlib.Path(__file__).parent / "complete.bash").read_text() script = script_template.replace("@prog@", prog).replace( "@data@", str(compdata_path) ) script_dest.write_text(script) yuio.io.info("Wrote bash script to `%s`", script_dest) def _write_zsh_script( self, prog: str, compdata_path: pathlib.Path, data_home: pathlib.Path, cache_home: pathlib.Path, config_home: pathlib.Path, ): import yuio.exec import yuio.io zsh_completions_home = data_home / "zsh/completions" os.makedirs(zsh_completions_home, exist_ok=True) script_dest = zsh_completions_home / ("_" + prog) script_template = (pathlib.Path(__file__).parent / "complete.zsh").read_text() script = script_template.replace("@prog@", prog).replace( "@data@", str(compdata_path) ) script_dest.write_text(script) yuio.io.info("Wrote zsh script to `%s`", script_dest) try: fpath = ( yuio.exec.exec( "zsh", "-lic", "echo -n $FPATH", ) .splitlines()[-1] .split(":") ) except subprocess.CalledProcessError: fpath = [] try: zhome = yuio.exec.exec( "zsh", "-lic", "echo -n ${ZDOTDIR:-$HOME}", ).splitlines()[-1] except subprocess.CalledProcessError: zhome = pathlib.Path.home() zhome = pathlib.Path(zhome) if str(zsh_completions_home) not in fpath: zprofile_path = zhome / ".zprofile" with open(zprofile_path, "a") as f: f.write( f"\n# Generated by Yuio, a python CLI library.\n" f"fpath=({zsh_completions_home} $fpath)\n" ) yuio.io.info( "Modified `%s` to add `%s` to `fpath`", zprofile_path, zsh_completions_home, ) # Try to remove completions cache from the most common places. for zcomp_basedir in [zhome, cache_home / "prezto"]: if not zcomp_basedir.exists() or not zcomp_basedir.is_dir(): continue for file in zcomp_basedir.iterdir(): if file.is_file() and re.match(r"^\.?zcompdump", file.name): os.remove(file) yuio.io.info("Deleted zsh completions cache at `%s`", file) try: # Run zsh with the right flags in case zshrc runs compinit. # If after generating completions user runs `zsh` without the `-l` flag, # our changes to fpath will not be visible, and compinit will dump # an invalid version of cache. To avoid this, we call zsh ourselves # before the user has a chance to do it. Notice, though, that we don't # run `compdump`. This is because we can't be sure that the user uses # the default cache path (~/.zcompdump). yuio.exec.exec("zsh", "-lic", "true") except subprocess.CalledProcessError: pass def _write_fish_script( self, prog: str, compdata_path: pathlib.Path, data_home: pathlib.Path, cache_home: pathlib.Path, config_home: pathlib.Path, ): import yuio.io fish_completions_home = data_home / "fish/vendor_completions.d" os.makedirs(fish_completions_home, exist_ok=True) script_dest = fish_completions_home / (prog + ".fish") script_template = (pathlib.Path(__file__).parent / "complete.fish").read_text() script = script_template.replace("@prog@", prog).replace( "@data@", str(compdata_path) ) script_dest.write_text(script) yuio.io.info("Wrote fish script to `%s`", script_dest) _SPECIAL_SYMBOLS = str.maketrans("\r\n\a\b\t", " ") def _dump(self, path: str, result: _t.List[str]): if self._subcommands: self._flags.append( ( ["c"], "subcommand", "<cmd>", 1, _CompleterSerializer.ChoiceWithDescriptions( [ (name, help) for name, (_, is_alias, help) in self._subcommands.items() if not is_alias ] ), ) ) for opts, desc, meta, nargs, completer in self._flags: if not isinstance(meta, tuple): meta = (meta,) compspec: _t.List[str] = [ path, " ".join(opts), desc or "", " ".join( re.sub( r"[\\ ]", lambda s: "\\S" if s.group() == " " else f"\\L", str(m), ) or "" for m in meta ), str(nargs), *completer.dump(), ] result.append( "\t".join(item.translate(self._SPECIAL_SYMBOLS) for item in compspec) ) for subcommand, (serializer, *_) in self._subcommands.items(): serializer._dump(f"{path}/{subcommand}", result) @staticmethod def _dump_nested(compspec: _t.List[object]) -> _t.List[str]: contents = [] for item in compspec: contents.extend(_CompleterSerializer._dump_nested_item(item)) return contents @staticmethod def _dump_nested_item(item: object) -> _t.List[str]: contents = [] if isinstance(item, _CompleterSerializer.Model): contents.extend(item.dump()) elif isinstance(item, list): contents.append(str(len(item))) for sub_item in item: contents.extend(_CompleterSerializer._dump_nested_item(sub_item)) elif isinstance(item, tuple): for sub_item in item: contents.extend(_CompleterSerializer._dump_nested_item(sub_item)) else: contents.append(str(item)) return contents class ModelBase: tag: _t.ClassVar[str] = "-" def __init_subclass__(cls, tag: str = "-", **kwargs): super(cls).__init_subclass__(**kwargs) cls.tag = tag @dataclass() class Model(ModelBase): def dump(self) -> _t.List[str]: compspec = [getattr(self, field.name) for field in dataclasses.fields(self)] contents = _CompleterSerializer._dump_nested(compspec) return [self.tag, str(len(contents)), *contents] @dataclass() class File(Model, tag="f"): ext: str @dataclass() class Dir(Model, tag="d"): pass @dataclass() class Choice(Model, tag="c"): choices: _t.List[str] def dump(self) -> _t.List[str]: return [self.tag, str(len(self.choices)), *self.choices] @dataclass() class ChoiceWithDescriptions(Model, tag="cd"): choices: _t.List[_t.Tuple[str, str]] def dump(self) -> _t.List[str]: return [ self.tag, str(len(self.choices) * 2), *[c[0] for c in self.choices], *[c[1] for c in self.choices], ] @dataclass() class Git(Model, tag="g"): class Mode(enum.Enum): Branch = "b" Remote = "r" Tag = "t" Head = "h" modes: _t.Set[Mode] = dataclasses.field( default_factory=lambda: { _CompleterSerializer.Git.Mode.Branch, _CompleterSerializer.Git.Mode.Tag, _CompleterSerializer.Git.Mode.Head, } ) def dump(self) -> _t.List[str]: return [self.tag, "1", "".join(mode.value for mode in self.modes)] @dataclass() class List(Model, tag="l"): delim: str inner: "_CompleterSerializer.Model" @dataclass() class ListMany(List, tag="lm"): pass @dataclass() class Tuple(Model, tag="t"): delim: str inner: _t.List["_CompleterSerializer.Model"] @dataclass() class TupleMany(Tuple, tag="tm"): pass @dataclass() class Alternative(Model, tag="a"): alternatives: _t.List[_t.Tuple[str, "_CompleterSerializer.Model"]] @dataclass() class CustomCompleter(Model, tag="cc"): completer: Completer def dump(self) -> _t.List[str]: return [ self.tag, "1", base64.b64encode(pickle.dumps(self.completer)).decode(), ] def _run_custom_completer(data: str, word: str): completer: Completer = pickle.loads(base64.b64decode(data)) completions = completer.complete(word, len(word), do_corrections=False) for completion in completions: print( f"{completion.iprefix}{completion.completion}{completion.isuffix}\t{completion.comment or ''}", file=sys.__stdout__, )