Coverage for yuio / term.py: 69%
461 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-29 19:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-29 19:55 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9Querying terminal info and working with ANSI terminals.
11This is a low-level module upon which :mod:`yuio.io` builds
12its higher-level abstraction.
15Detecting terminal capabilities
16-------------------------------
18Terminal capabilities are stored in a :class:`Term` object.
20Usually, you don't need to query terminal capabilities yourself,
21as you can use Yuio's global configuration from :mod:`yuio.io`
22(see :func:`yuio.io.get_term`).
24However, you can get a :class:`Term` object by using :func:`get_term_from_stream`:
26.. autofunction:: get_term_from_stream
28.. autofunction:: get_tty
30.. autoclass:: Term
31 :members:
33.. autoclass:: TerminalTheme
34 :members:
36.. autoclass:: Lightness
37 :members:
40Utilities
41---------
43.. autofunction:: stream_is_unicode
45.. autofunction:: get_tty_size
47.. autofunction:: detect_ci
49.. autofunction:: detect_ci_color_support
52Re-imports
53----------
55.. type:: ColorSupport
56 :no-index:
58 Alias of :obj:`yuio.color.ColorSupport`.
60"""
62from __future__ import annotations
64import atexit
65import contextlib
66import dataclasses
67import enum
68import io
69import locale
70import os
71import re
72import shutil
73import sys
74import threading
75from dataclasses import dataclass
77import yuio
78import yuio.color
79from yuio.color import ColorSupport
80from yuio.util import ClosedIO as _ClosedIO
82from typing import TYPE_CHECKING
84if TYPE_CHECKING:
85 import typing_extensions as _t
86else:
87 from yuio import _typing as _t
89__all__ = [
90 "ColorSupport",
91 "Lightness",
92 "Term",
93 "TerminalTheme",
94 "detect_ci",
95 "detect_ci_color_support",
96 "get_term_from_stream",
97 "get_tty",
98 "get_tty_size",
99 "stream_is_unicode",
100]
102T = _t.TypeVar("T")
105_LOCK = threading.Lock()
107# These variables are set in `_prepare_tty`.
108_TTY_SETUP_PERFORMED: bool = False
109_TTY_OUTPUT: _t.TextIO | None
110_TTY_INPUT: _t.TextIO | None
111_TERMINAL_THEME: TerminalTheme | None
112_EXPLICIT_COLOR_SUPPORT: ColorSupport | bool | None
113_COLOR_SUPPORT: ColorSupport
116# Redefine canonical streams so that we don't monkeypatch `sys.__std*__` in tests.
117__stdin = sys.__stdin__
118__stdout = sys.__stdout__
119__stderr = sys.__stderr__
122class Lightness(enum.Enum):
123 """
124 Overall color theme of a terminal.
126 Can help with deciding which colors to use when printing output.
128 """
130 UNKNOWN = enum.auto()
131 """
132 We couldn't determine terminal background, or it wasn't dark
133 or bright enough to fall in one category or another.
135 """
137 DARK = enum.auto()
138 """
139 Terminal background is dark.
141 """
143 LIGHT = enum.auto()
144 """
145 Terminal background is light.
147 """
150@dataclass(frozen=True, slots=True)
151class TerminalTheme:
152 """
153 Colors and theme of the attached terminal.
155 """
157 background: yuio.color.ColorValue
158 """
159 Background color of a terminal.
161 """
163 foreground: yuio.color.ColorValue
164 """
165 Foreground color of a terminal.
167 """
169 black: yuio.color.ColorValue
170 """
171 Color value for the default "black" color.
173 """
175 bright_black: yuio.color.ColorValue
176 """
177 Color value for the default "bright black" color.
179 """
181 red: yuio.color.ColorValue
182 """
183 Color value for the default "red" color.
185 """
187 bright_red: yuio.color.ColorValue
188 """
189 Color value for the default "bright red" color.
191 """
193 green: yuio.color.ColorValue
194 """
195 Color value for the default "green" color.
197 """
199 bright_green: yuio.color.ColorValue
200 """
201 Color value for the default "bright green" color.
203 """
205 yellow: yuio.color.ColorValue
206 """
207 Color value for the default "yellow" color.
209 """
211 bright_yellow: yuio.color.ColorValue
212 """
213 Color value for the default "bright yellow" color.
215 """
217 blue: yuio.color.ColorValue
218 """
219 Color value for the default "blue" color.
221 """
223 bright_blue: yuio.color.ColorValue
224 """
225 Color value for the default "bright blue" color.
227 """
229 magenta: yuio.color.ColorValue
230 """
231 Color value for the default "magenta" color.
233 """
235 bright_magenta: yuio.color.ColorValue
236 """
237 Color value for the default "bright magenta" color.
239 """
241 cyan: yuio.color.ColorValue
242 """
243 Color value for the default "cyan" color.
245 """
247 bright_cyan: yuio.color.ColorValue
248 """
249 Color value for the default "bright cyan" color.
251 """
253 white: yuio.color.ColorValue
254 """
255 Color value for the default "white" color.
257 """
259 bright_white: yuio.color.ColorValue
260 """
261 Color value for the default "bright white" color.
263 """
265 lightness: Lightness
266 """
267 Overall color theme of a terminal, i.e. dark or light.
269 """
272@dataclass(frozen=True, slots=True)
273class Term:
274 """
275 This class contains all info about what kinds of things the terminal
276 supports. If available, it will also have info about terminal's theme,
277 i.e. dark or light background, etc.
279 """
281 ostream: _t.TextIO
282 """
283 Terminal's output stream.
285 """
287 istream: _t.TextIO
288 """
289 Terminal's input stream.
291 """
293 color_support: ColorSupport = dataclasses.field(
294 default=ColorSupport.NONE, kw_only=True
295 )
296 """
297 Terminal's capability for coloring output.
299 """
301 ostream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
302 """
303 Output is connecter to a terminal, and we're not in CI.
305 Note that output being connected to a TTY doesn't mean that it's interactive:
306 this process can be in background.
308 """
310 istream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
311 """
312 Output is connecter to a terminal, and we're not in CI.
314 Note that output being connected to a TTY doesn't mean that it's interactive:
315 this process can be in background.
317 """
319 terminal_theme: TerminalTheme | None = dataclasses.field(default=None, kw_only=True)
320 """
321 Terminal's default foreground, background, and text colors.
323 """
325 is_unicode: bool = dataclasses.field(default=False, kw_only=True)
326 """
327 Terminal's output supports unicode characters.
329 """
331 @property
332 def supports_colors(self) -> bool:
333 """
334 Return :data:`True` if terminal supports simple 8-bit color codes.
336 """
338 return self.color_support >= ColorSupport.ANSI
340 @property
341 def supports_colors_256(self) -> bool:
342 """
343 Return :data:`True` if terminal supports 256-encoded colors.
345 """
347 return self.color_support >= ColorSupport.ANSI_256
349 @property
350 def supports_colors_true(self) -> bool:
351 """
352 Return :data:`True` if terminal supports true colors.
354 """
356 return self.color_support >= ColorSupport.ANSI_TRUE
358 @property
359 def is_tty(self) -> bool:
360 """
361 Return :data:`True` if input and output are connected to a TTY. In this mode
362 we can interact with the user by writing and reading lines of text.
364 """
366 return self.istream_is_tty and self.ostream_is_tty
368 @property
369 def can_run_widgets(self) -> bool:
370 """
371 Return :data:`True` if input and output are interactive and colors
372 are supported. In this mode we can run interactive widgets.
374 """
376 return self.color_support >= ColorSupport.ANSI and self.is_tty
378 @staticmethod
379 def make_dummy(is_unicode: bool = True) -> Term:
380 """
381 Make a dummy terminal with closed streams and no capabilities.
383 """
385 stream = io.StringIO()
386 stream.close()
387 return Term(
388 istream=_ClosedIO(),
389 ostream=_ClosedIO(),
390 is_unicode=is_unicode,
391 )
394_CI_ENV_VARS = [
395 "TRAVIS",
396 "CIRCLECI",
397 "APPVEYOR",
398 "GITLAB_CI",
399 "BUILDKITE",
400 "DRONE",
401 "TEAMCITY_VERSION",
402 "GITHUB_ACTIONS",
403]
406def stream_is_unicode(stream: _t.TextIO, /) -> bool:
407 """
408 Determine of stream's encoding is some version of unicode.
410 """
412 encoding = getattr(stream, "encoding", None) or locale.getpreferredencoding() or ""
413 encoding = encoding.casefold()
414 return "utf" in encoding or "unicode" in encoding
417def get_tty_size(fallback: tuple[int, int] = (80, 24)):
418 """
419 Like :func:`shutil.get_terminal_size`, but uses TTY stream if it's available.
421 :param fallback:
422 tuple with width and height that will be used if query fails.
424 """
426 _prepare_tty()
428 try:
429 columns = int(os.environ["COLUMNS"])
430 except (KeyError, ValueError):
431 columns = 0
433 try:
434 lines = int(os.environ["LINES"])
435 except (KeyError, ValueError):
436 lines = 0
438 if columns <= 0 or lines <= 0:
439 try:
440 size = os.get_terminal_size(_TTY_OUTPUT.fileno()) # type: ignore
441 except (AttributeError, ValueError, OSError):
442 # stream is closed, detached, or not a terminal, or
443 # os.get_tty_size() is unsupported
444 size = os.terminal_size(fallback)
445 if columns <= 0:
446 columns = size.columns or fallback[0]
447 if lines <= 0:
448 lines = size.lines or fallback[1]
450 return os.terminal_size((columns, lines))
453_IN_CI = None
456def detect_ci() -> bool:
457 """
458 Scan environment variables to detect if we're in a known CI environment.
460 """
462 global _IN_CI
464 if _IN_CI is None:
465 _IN_CI = "CI" in os.environ or any(ci in os.environ for ci in _CI_ENV_VARS)
467 return _IN_CI
470def detect_ci_color_support() -> ColorSupport:
471 """
472 Scan environment variables to detect an appropriate level of color support
473 of a CI environment.
475 If we're not in CI, return :attr:`ColorSupport.NONE <yuio.color.ColorSupport.NONE>`.
477 """
479 if "GITHUB_ACTIONS" in os.environ:
480 return ColorSupport.ANSI_TRUE
481 elif any(ci in os.environ for ci in _CI_ENV_VARS):
482 return ColorSupport.ANSI
483 else:
484 return ColorSupport.NONE
487def get_tty() -> Term:
488 """
489 Query info about TTY.
491 On Unix, this returns terminal connected to ``/dev/tty``. On Windows, this returns
492 terminal connected to ``CONIN$``/``CONOUT$``.
494 If opening any of these fails, returns :class:`Term` with ``stdin``/``stdout``
495 as a fallback.
497 .. note::
499 Prefer using ``stderr`` for normal IO: your users expect to be able to redirect
500 messages from your program.
502 Only use ``/dev/tty`` for querying passwords or other things that must not
503 be redirected.
505 """
507 _prepare_tty()
508 ostream = _TTY_OUTPUT or __stderr
509 istream = _TTY_INPUT or __stdin
510 assert ostream is not None
511 assert istream is not None
512 return get_term_from_stream(ostream, istream, allow_env_overrides=True)
515def get_term_from_stream(
516 ostream: _t.TextIO,
517 istream: _t.TextIO | None = None,
518 /,
519 *,
520 allow_env_overrides: bool = False,
521) -> Term:
522 """
523 Query info about a terminal attached to the given stream.
525 :param ostream:
526 output stream.
527 :param istream:
528 input stream. If not given, widgets will not work with this terminal.
529 :param allow_env_overrides:
530 honor environment variables and CLI flags when determining capabilities
531 of streams.
533 """
535 is_unicode = stream_is_unicode(ostream)
537 if (
538 # For building docs in github.
539 "YUIO_FORCE_FULL_TERM_SUPPORT" in os.environ and istream is not None
540 ): # pragma: no cover
541 return Term(
542 ostream=ostream,
543 istream=istream,
544 color_support=ColorSupport.ANSI_TRUE,
545 ostream_is_tty=True,
546 istream_is_tty=True,
547 is_unicode=is_unicode,
548 )
550 _prepare_tty()
552 output_is_tty = _output_is_tty(ostream)
553 input_is_tty = _input_is_tty(istream)
554 in_ci = detect_ci()
556 # Detect colors.
557 if output_is_tty or (_EXPLICIT_COLOR_SUPPORT is not None and allow_env_overrides):
558 color_support = _COLOR_SUPPORT
559 else:
560 color_support = ColorSupport.NONE
562 if istream is None:
563 istream = _ClosedIO()
565 return Term(
566 ostream=ostream,
567 istream=istream,
568 color_support=color_support,
569 ostream_is_tty=output_is_tty and not in_ci,
570 istream_is_tty=input_is_tty and not in_ci,
571 terminal_theme=_TERMINAL_THEME,
572 is_unicode=is_unicode,
573 )
576def _prepare_tty():
577 if not _TTY_SETUP_PERFORMED:
578 with _LOCK:
579 if not _TTY_SETUP_PERFORMED:
580 _do_prepare_tty()
583def _do_prepare_tty():
584 global \
585 _TTY_SETUP_PERFORMED, \
586 _TERMINAL_THEME, \
587 _EXPLICIT_COLOR_SUPPORT, \
588 _COLOR_SUPPORT
590 _find_tty()
592 _TTY_SETUP_PERFORMED = True
594 # Theme is `None` for now, will query it later.
595 _TERMINAL_THEME = None
597 # Find out if user specified `--color` or `FORCE_COLOR`.
598 _EXPLICIT_COLOR_SUPPORT = _detect_explicit_color_settings()
600 # Check user preferences.
601 if _EXPLICIT_COLOR_SUPPORT is False:
602 # Colors disabled, nothing more to do.
603 _COLOR_SUPPORT = ColorSupport.NONE
604 return
605 elif _EXPLICIT_COLOR_SUPPORT is True:
606 # At least ANSI. Might improve later.
607 _COLOR_SUPPORT = max(ColorSupport.ANSI, _detect_color_support_from_env())
608 elif _EXPLICIT_COLOR_SUPPORT is None:
609 # At least NONE. Might improve later.
610 _COLOR_SUPPORT = _detect_color_support_from_env()
611 else:
612 # Exact color support is given.
613 _COLOR_SUPPORT = _EXPLICIT_COLOR_SUPPORT
615 if _TTY_OUTPUT is None:
616 # Can't find attached TTY output, hence can't improve color support.
617 return
619 if os.name == "nt":
620 # Try enabling true colors.
621 if _enable_vt_processing(_TTY_OUTPUT):
622 # Success, can improve color support.
623 if _EXPLICIT_COLOR_SUPPORT is None or _EXPLICIT_COLOR_SUPPORT is True:
624 _COLOR_SUPPORT = ColorSupport.ANSI_TRUE
625 else:
626 # Failure, this version of Windows does not support colors.
627 return
629 if _TTY_INPUT is None:
630 # Can't find attached TTY input, hence can't improve color support.
631 return
633 if not _is_foreground(_TTY_OUTPUT) or not _is_foreground(_TTY_INPUT):
634 # We're not a foreground process, we won't be able to fetch colors.
635 return
636 if detect_ci():
637 # We're in CI, we won't be able to fetch colors.
638 return
639 if not _is_tty(__stdin):
640 # We don't want to query colors if our stdin is redirected: this is a sign
641 # that this program runs in some sort of a pipeline, and multiple instances
642 # of it might run at the same time. If this happens, several processes/threads
643 # can interact with the same TTY, leading to garbled output.
644 return
646 if _COLOR_SUPPORT >= ColorSupport.ANSI:
647 # We were able to find TTY, and colors are supported.
648 # Try fetching terminal theme.
649 _TERMINAL_THEME = _get_standard_colors(_TTY_OUTPUT, _TTY_INPUT)
652def _find_tty():
653 global _TTY_OUTPUT, _TTY_INPUT
655 _TTY_OUTPUT = _TTY_INPUT = None
657 closer = contextlib.ExitStack()
658 try:
659 if os.name == "nt":
660 file_io_in = io._WindowsConsoleIO("CONIN$", "r") # type: ignore
661 tty_in = closer.enter_context(
662 io.TextIOWrapper(file_io_in, encoding="utf-8")
663 )
664 file_io_out = io._WindowsConsoleIO("CONOUT$", "w") # type: ignore
665 tty_out = closer.enter_context(
666 io.TextIOWrapper(file_io_out, encoding="utf-8")
667 )
668 else:
669 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
670 file_io = io.FileIO(fd, "w+")
671 file_io.name = "/dev/tty"
672 tty_in = tty_out = closer.enter_context(io.TextIOWrapper(file_io))
673 except (OSError, AttributeError):
674 closer.close()
675 except:
676 closer.close()
677 raise
678 else:
679 atexit.register(closer.close)
680 _TTY_INPUT = tty_in
681 _TTY_OUTPUT = tty_out
682 return
684 for stream in (__stderr, __stdout):
685 if stream is not None and _output_is_tty(stream):
686 _TTY_OUTPUT = stream
687 break
688 if __stdin is not None and _input_is_tty(__stdin):
689 _TTY_INPUT = __stdin
692def _get_standard_colors(
693 ostream: _t.TextIO, istream: _t.TextIO
694) -> TerminalTheme | None:
695 if "YUIO_DISABLE_OSC_QUERIES" in os.environ:
696 return None
698 try:
699 query = "\x1b]10;?\x1b\\\x1b]11;?\x1b\\" + "".join(
700 [f"\x1b]4;{i};?\x1b\\" for i in range(16)]
701 )
702 response = _query_term(ostream, istream, query)
703 if not response:
704 return None
706 # Deal with foreground color.
708 match = re.match(
709 r"^\x1b]10;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
710 response,
711 re.IGNORECASE,
712 )
713 if match is None: # pragma: no cover
714 return None
716 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
717 foreground = yuio.color.ColorValue.from_rgb(r, g, b)
719 response = response[match.end() :]
721 # Deal with background color.
723 match = re.match(
724 r"^\x1b]11;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
725 response,
726 re.IGNORECASE,
727 )
728 if match is None: # pragma: no cover
729 return None
731 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
732 background = yuio.color.ColorValue.from_rgb(r, g, b)
733 luma = (0.2627 * r + 0.6780 * g + 0.0593 * b) / 256
735 if luma <= 0.2:
736 lightness = Lightness.DARK
737 elif luma >= 0.85:
738 lightness = Lightness.LIGHT
739 else:
740 lightness = Lightness.UNKNOWN
742 response = response[match.end() :]
744 # Deal with other colors
746 colors = {}
748 while response:
749 match = re.match(
750 r"^\x1b]4;(\d+);rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
751 response,
752 re.IGNORECASE,
753 )
754 if match is None: # pragma: no cover
755 return None
757 c = int(match.group(1))
758 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()[1:])
759 colors[c] = yuio.color.ColorValue.from_rgb(r, g, b)
761 response = response[match.end() :]
763 for k in range(8):
764 if k not in colors:
765 return None
767 # return colors
768 return TerminalTheme(
769 background=background,
770 foreground=foreground,
771 black=colors[0],
772 bright_black=colors.get(8 + 0, colors[0]),
773 red=colors[1],
774 bright_red=colors.get(8 + 1, colors[1]),
775 green=colors[2],
776 bright_green=colors.get(8 + 2, colors[2]),
777 yellow=colors[3],
778 bright_yellow=colors.get(8 + 3, colors[3]),
779 blue=colors[4],
780 bright_blue=colors.get(8 + 4, colors[4]),
781 magenta=colors[5],
782 bright_magenta=colors.get(8 + 5, colors[5]),
783 cyan=colors[6],
784 bright_cyan=colors.get(8 + 6, colors[6]),
785 white=colors[7],
786 bright_white=colors.get(8 + 7, colors[7]),
787 lightness=lightness,
788 )
790 except Exception: # pragma: no cover
791 return None
794def _query_term(ostream: _t.TextIO, istream: _t.TextIO, query: str) -> str | None:
795 try:
796 # Lock the keyboard.
797 ostream.write("\x1b[2h")
798 ostream.flush()
800 with _enter_raw_mode(ostream, istream):
801 _flush_input_buffer(ostream, istream)
803 # It is important that we unlock keyboard before exiting `cbreak`,
804 # hence the nested `try`.
805 try:
806 # Append a DA1 query, as virtually all terminals support it.
807 ostream.write(query + "\x1b[c")
808 ostream.flush()
810 buf = _read_keycode(ostream, istream, timeout=0.250)
811 if not buf.startswith("\x1b"):
812 yuio._logger.warning("_query_term invalid response: %r", buf)
813 return None
815 # Read till we find a DA1 response.
816 while not re.search(r"\x1b\[\?.*?c", buf):
817 buf += _read_keycode(ostream, istream, timeout=0.250)
819 return buf[: buf.index("\x1b[?")]
820 finally:
821 _flush_input_buffer(ostream, istream)
823 # Release the keyboard.
824 ostream.write("\x1b[2i")
825 ostream.flush()
826 except Exception: # pragma: no cover
827 yuio._logger.warning("_query_term error", exc_info=True)
828 return None
829 finally:
830 # Release the keyboard.
831 ostream.write("\x1b[2i")
832 ostream.flush()
835def _detect_explicit_color_settings() -> ColorSupport | bool | None:
836 color_support = None
838 if "FORCE_COLOR" in os.environ:
839 color_support = True
841 if "NO_COLOR" in os.environ or "FORCE_NO_COLOR" in os.environ:
842 color_support = False
844 # Note: we don't rely on argparse to parse flags and send them to us
845 # because these functions can be called before parsing arguments.
846 for arg in sys.argv[1:]:
847 if arg in ("--color", "--force-color"):
848 color_support = True
849 elif arg in ("--no-color", "--force-no-color"):
850 color_support = False
851 elif arg.startswith(("--color=", "--colors=")):
852 value = (
853 arg.split("=", maxsplit=1)[1]
854 .replace("_", "")
855 .replace("-", "")
856 .casefold()
857 )
858 if value in ["1", "yes", "true"]:
859 color_support = True
860 elif value in ["0", "no", "false"]:
861 color_support = False
862 elif value == "ansi":
863 color_support = ColorSupport.ANSI
864 elif value == "ansi256":
865 color_support = ColorSupport.ANSI_256
866 elif value == "ansitrue":
867 color_support = ColorSupport.ANSI_TRUE
869 return color_support
872def _detect_color_support_from_env() -> ColorSupport:
873 term = os.environ.get("TERM", "").lower()
874 colorterm = os.environ.get("COLORTERM", "").lower()
876 if detect_ci():
877 return detect_ci_color_support()
878 elif os.name == "nt":
879 return ColorSupport.NONE
880 elif colorterm in ("truecolor", "24bit") or term == "xterm-kitty":
881 return ColorSupport.ANSI_TRUE
882 elif colorterm in ("yes", "true") or "256color" in term or term == "screen":
883 if os.name == "posix" and term == "xterm-256color" and shutil.which("wslinfo"):
884 return ColorSupport.ANSI_TRUE
885 else:
886 return ColorSupport.ANSI_256
887 elif "linux" in term or "color" in term or "ansi" in term or "xterm" in term:
888 return ColorSupport.ANSI
890 return ColorSupport.NONE
893def _is_tty(stream: _t.TextIO | None) -> bool:
894 try:
895 return stream is not None and stream.isatty()
896 except Exception: # pragma: no cover
897 return False
900def _input_is_tty(stream: _t.TextIO | None) -> bool:
901 try:
902 return stream is not None and _is_tty(stream) and stream.readable()
903 except Exception: # pragma: no cover
904 return False
907def _output_is_tty(stream: _t.TextIO | None) -> bool:
908 try:
909 return stream is not None and _is_tty(stream) and stream.writable()
910 except Exception: # pragma: no cover
911 return False
914@contextlib.contextmanager
915def _modify_keyboard(
916 ostream: _t.TextIO,
917 bracketed_paste: bool = False,
918 modify_keyboard: bool = False,
919):
920 prologue = []
921 if bracketed_paste:
922 prologue.append("\x1b[?2004h")
923 if modify_keyboard:
924 prologue.append("\x1b[>1u")
925 if prologue:
926 ostream.write("".join(prologue))
927 ostream.flush()
928 try:
929 yield
930 finally:
931 epilog = []
932 if bracketed_paste:
933 epilog.append("\x1b[?2004l")
934 epilog.append("\x1b[<u")
935 if epilog:
936 ostream.write("".join(epilog))
937 ostream.flush()
940# Platform-specific code for working with terminals.
941if os.name == "posix":
942 import select
943 import signal
944 import termios
945 import tty
947 def _is_foreground(stream: _t.TextIO | None) -> bool:
948 try:
949 return stream is not None and os.getpgrp() == os.tcgetpgrp(stream.fileno())
950 except Exception: # pragma: no cover
951 return False
953 @contextlib.contextmanager
954 def _enter_raw_mode(
955 ostream: _t.TextIO,
956 istream: _t.TextIO,
957 bracketed_paste: bool = False,
958 modify_keyboard: bool = False,
959 ):
960 prev_mode = termios.tcgetattr(istream)
961 new_mode = prev_mode.copy()
962 new_mode[tty.LFLAG] &= ~(
963 termios.ECHO # Don't print back what user types.
964 | termios.ICANON # Disable line editing.
965 | termios.ISIG # Disable signals on C-c and C-z.
966 )
967 new_mode[tty.CC] = new_mode[tty.CC].copy()
968 new_mode[tty.CC][termios.VMIN] = 1
969 new_mode[tty.CC][termios.VTIME] = 0
970 termios.tcsetattr(istream, termios.TCSAFLUSH, new_mode)
972 try:
973 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
974 yield
975 finally:
976 termios.tcsetattr(istream, termios.TCSAFLUSH, prev_mode)
978 def _read_keycode(
979 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
980 ) -> str:
981 if timeout and not bool(select.select([istream], [], [], timeout)[0]):
982 raise TimeoutError()
983 key = os.read(istream.fileno(), 128)
984 while bool(select.select([istream], [], [], 0)[0]):
985 key += os.read(istream.fileno(), 128)
987 return key.decode(istream.encoding, errors="replace")
989 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
990 pass
992 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
993 return False # This is a windows functionality
995 def _pause():
996 os.kill(os.getpid(), signal.SIGTSTP)
998elif os.name == "nt":
999 import ctypes
1000 import ctypes.wintypes
1001 import msvcrt
1003 _FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer
1004 _FlushConsoleInputBuffer.argtypes = [ctypes.wintypes.HANDLE]
1005 _FlushConsoleInputBuffer.restype = ctypes.wintypes.BOOL
1007 _GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
1008 _GetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD]
1009 _GetConsoleMode.restype = ctypes.wintypes.BOOL
1011 _SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
1012 _SetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD]
1013 _SetConsoleMode.restype = ctypes.wintypes.BOOL
1015 _GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow
1016 _GetConsoleWindow.argtypes = []
1017 _GetConsoleWindow.restype = ctypes.wintypes.HWND
1019 _IsWindowVisible = ctypes.windll.user32.IsWindowVisible
1020 _IsWindowVisible.argtypes = [ctypes.wintypes.HWND]
1021 _SetConsoleMode.restype = ctypes.wintypes.BOOL
1023 _ReadConsoleW = ctypes.windll.kernel32.ReadConsoleW
1024 _ReadConsoleW.argtypes = [
1025 ctypes.wintypes.HANDLE,
1026 ctypes.wintypes.LPVOID,
1027 ctypes.wintypes.DWORD,
1028 ctypes.wintypes.LPDWORD,
1029 ctypes.wintypes.LPVOID,
1030 ]
1031 _ReadConsoleW.restype = ctypes.wintypes.BOOL
1033 _ENABLE_PROCESSED_OUTPUT = 0x0001
1034 _ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002
1035 _ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
1036 _ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
1038 _ISTREAM_HANDLE = None
1040 def _is_foreground(stream: _t.TextIO | None) -> bool:
1041 window = _GetConsoleWindow()
1042 if not window:
1043 return False
1044 return _IsWindowVisible(window)
1046 @contextlib.contextmanager
1047 def _enter_raw_mode(
1048 ostream: _t.TextIO,
1049 istream: _t.TextIO,
1050 bracketed_paste: bool = False,
1051 modify_keyboard: bool = False,
1052 ):
1053 global _ISTREAM_HANDLE
1055 if _ISTREAM_HANDLE is None:
1056 _prepare_tty()
1057 _ISTREAM_HANDLE = msvcrt.get_osfhandle((_TTY_INPUT or __stdin).fileno()) # type: ignore
1059 mode = ctypes.wintypes.DWORD()
1060 success = _GetConsoleMode(_ISTREAM_HANDLE, ctypes.byref(mode))
1061 if not success:
1062 raise ctypes.WinError()
1063 success = _SetConsoleMode(_ISTREAM_HANDLE, _ENABLE_VIRTUAL_TERMINAL_INPUT)
1064 if not success:
1065 raise ctypes.WinError()
1067 try:
1068 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
1069 yield
1070 finally:
1071 success = _SetConsoleMode(_ISTREAM_HANDLE, mode)
1072 if not success:
1073 raise ctypes.WinError()
1075 def _read_keycode(
1076 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1077 ) -> str:
1078 assert _ISTREAM_HANDLE is not None
1080 CHAR16 = ctypes.wintypes.WCHAR * 16
1082 n_read = ctypes.wintypes.DWORD()
1083 buffer = CHAR16()
1085 success = _ReadConsoleW(
1086 _ISTREAM_HANDLE,
1087 ctypes.byref(buffer),
1088 16,
1089 ctypes.byref(n_read),
1090 0,
1091 )
1092 if not success:
1093 raise ctypes.WinError()
1095 return buffer.value
1097 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1098 assert _ISTREAM_HANDLE is not None
1100 success = _FlushConsoleInputBuffer(_ISTREAM_HANDLE)
1101 if not success:
1102 raise ctypes.WinError()
1104 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1105 try:
1106 version = sys.getwindowsversion()
1107 if version.major < 10 or version.build < 14931:
1108 return False
1110 handle = msvcrt.get_osfhandle(ostream.fileno())
1111 return bool(
1112 _SetConsoleMode(
1113 handle,
1114 _ENABLE_PROCESSED_OUTPUT
1115 | _ENABLE_WRAP_AT_EOL_OUTPUT
1116 | _ENABLE_VIRTUAL_TERMINAL_PROCESSING,
1117 )
1118 )
1119 except Exception: # pragma: no cover
1120 return False
1122 def _pause():
1123 pass
1125else: # pragma: no cover
1127 def _is_foreground(stream: _t.TextIO | None) -> bool:
1128 return False
1130 @contextlib.contextmanager
1131 def _enter_raw_mode(
1132 ostream: _t.TextIO,
1133 istream: _t.TextIO,
1134 bracketed_paste: bool = False,
1135 modify_keyboard: bool = False,
1136 ):
1137 raise OSError("not supported")
1138 yield
1140 def _read_keycode(
1141 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1142 ) -> str:
1143 raise OSError("not supported")
1145 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1146 raise OSError("not supported")
1148 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1149 raise OSError("not supported")
1151 def _pause():
1152 raise OSError("not supported")