Coverage for yuio / term.py: 70%
458 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:42 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:42 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9Querying terminal info and working with ANSI terminals.
11This is a low-level module upon which :mod:`yuio.io` builds
12its higher-level abstraction.
15Detecting terminal capabilities
16-------------------------------
18Terminal capabilities are stored in a :class:`Term` object.
20Usually, you don't need to query terminal capabilities yourself,
21as you can use Yuio's global configuration from :mod:`yuio.io`
22(see :func:`yuio.io.get_term`).
24However, you can get a :class:`Term` object by using :func:`get_term_from_stream`:
26.. autofunction:: get_term_from_stream
28.. autofunction:: get_tty
30.. autoclass:: Term
31 :members:
33.. autoclass:: TerminalTheme
34 :members:
36.. autoclass:: Lightness
37 :members:
40Utilities
41---------
43.. autofunction:: stream_is_unicode
45.. autofunction:: get_tty_size
47.. autofunction:: detect_ci
49.. autofunction:: detect_ci_color_support
52Re-imports
53----------
55.. type:: ColorSupport
56 :no-index:
58 Alias of :obj:`yuio.color.ColorSupport`.
60"""
62from __future__ import annotations
64import atexit
65import contextlib
66import dataclasses
67import enum
68import io
69import locale
70import os
71import re
72import shutil
73import sys
74import threading
75from dataclasses import dataclass
77import yuio
78import yuio.color
79from yuio.color import ColorSupport
80from yuio.util import ClosedIO as _ClosedIO
82from typing import TYPE_CHECKING
84if TYPE_CHECKING:
85 import typing_extensions as _t
86else:
87 from yuio import _typing as _t
89__all__ = [
90 "ColorSupport",
91 "Lightness",
92 "Term",
93 "TerminalTheme",
94 "detect_ci",
95 "detect_ci_color_support",
96 "get_term_from_stream",
97 "get_tty",
98 "get_tty_size",
99 "stream_is_unicode",
100]
102T = _t.TypeVar("T")
105_LOCK = threading.Lock()
107# These variables are set in `_prepare_tty`.
108_TTY_SETUP_PERFORMED: bool = False
109_TTY_OUTPUT: _t.TextIO | None
110_TTY_INPUT: _t.TextIO | None
111_TERMINAL_THEME: TerminalTheme | None
112_EXPLICIT_COLOR_SUPPORT: ColorSupport | bool | None
113_COLOR_SUPPORT: ColorSupport
116# Redefine canonical streams so that we don't monkeypatch `sys.__std*__` in tests.
117__stdin = sys.__stdin__
118__stdout = sys.__stdout__
119__stderr = sys.__stderr__
122class Lightness(enum.Enum):
123 """
124 Overall color theme of a terminal.
126 Can help with deciding which colors to use when printing output.
128 """
130 UNKNOWN = enum.auto()
131 """
132 We couldn't determine terminal background, or it wasn't dark
133 or bright enough to fall in one category or another.
135 """
137 DARK = enum.auto()
138 """
139 Terminal background is dark.
141 """
143 LIGHT = enum.auto()
144 """
145 Terminal background is light.
147 """
150@dataclass(frozen=True, slots=True)
151class TerminalTheme:
152 """
153 Colors and theme of the attached terminal.
155 """
157 background: yuio.color.ColorValue
158 """
159 Background color of a terminal.
161 """
163 foreground: yuio.color.ColorValue
164 """
165 Foreground color of a terminal.
167 """
169 black: yuio.color.ColorValue
170 """
171 Color value for the default "black" color.
173 """
175 bright_black: yuio.color.ColorValue
176 """
177 Color value for the default "bright black" color.
179 """
181 red: yuio.color.ColorValue
182 """
183 Color value for the default "red" color.
185 """
187 bright_red: yuio.color.ColorValue
188 """
189 Color value for the default "bright red" color.
191 """
193 green: yuio.color.ColorValue
194 """
195 Color value for the default "green" color.
197 """
199 bright_green: yuio.color.ColorValue
200 """
201 Color value for the default "bright green" color.
203 """
205 yellow: yuio.color.ColorValue
206 """
207 Color value for the default "yellow" color.
209 """
211 bright_yellow: yuio.color.ColorValue
212 """
213 Color value for the default "bright yellow" color.
215 """
217 blue: yuio.color.ColorValue
218 """
219 Color value for the default "blue" color.
221 """
223 bright_blue: yuio.color.ColorValue
224 """
225 Color value for the default "bright blue" color.
227 """
229 magenta: yuio.color.ColorValue
230 """
231 Color value for the default "magenta" color.
233 """
235 bright_magenta: yuio.color.ColorValue
236 """
237 Color value for the default "bright magenta" color.
239 """
241 cyan: yuio.color.ColorValue
242 """
243 Color value for the default "cyan" color.
245 """
247 bright_cyan: yuio.color.ColorValue
248 """
249 Color value for the default "bright cyan" color.
251 """
253 white: yuio.color.ColorValue
254 """
255 Color value for the default "white" color.
257 """
259 bright_white: yuio.color.ColorValue
260 """
261 Color value for the default "bright white" color.
263 """
265 lightness: Lightness
266 """
267 Overall color theme of a terminal, i.e. dark or light.
269 """
272@dataclass(frozen=True, slots=True)
273class Term:
274 """
275 This class contains all info about what kinds of things the terminal
276 supports. If available, it will also have info about terminal's theme,
277 i.e. dark or light background, etc.
279 """
281 ostream: _t.TextIO
282 """
283 Terminal's output stream.
285 """
287 istream: _t.TextIO
288 """
289 Terminal's input stream.
291 """
293 color_support: ColorSupport = dataclasses.field(
294 default=ColorSupport.NONE, kw_only=True
295 )
296 """
297 Terminal's capability for coloring output.
299 """
301 ostream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
302 """
303 Output is connecter to a terminal, and we're not in CI.
305 Note that output being connected to a TTY doesn't mean that it's interactive:
306 this process can be in background.
308 """
310 istream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
311 """
312 Output is connecter to a terminal, and we're not in CI.
314 Note that output being connected to a TTY doesn't mean that it's interactive:
315 this process can be in background.
317 """
319 terminal_theme: TerminalTheme | None = dataclasses.field(default=None, kw_only=True)
320 """
321 Terminal's default foreground, background, and text colors.
323 """
325 is_unicode: bool = dataclasses.field(default=False, kw_only=True)
326 """
327 Terminal's output supports unicode characters.
329 """
331 @property
332 def supports_colors(self) -> bool:
333 """
334 Return :data:`True` if terminal supports simple 8-bit color codes.
336 """
338 return self.color_support >= ColorSupport.ANSI
340 @property
341 def supports_colors_256(self) -> bool:
342 """
343 Return :data:`True` if terminal supports 256-encoded colors.
345 """
347 return self.color_support >= ColorSupport.ANSI_256
349 @property
350 def supports_colors_true(self) -> bool:
351 """
352 Return :data:`True` if terminal supports true colors.
354 """
356 return self.color_support >= ColorSupport.ANSI_TRUE
358 @property
359 def is_tty(self) -> bool:
360 """
361 Return :data:`True` if input and output are connected to a TTY. In this mode
362 we can interact with the user by writing and reading lines of text.
364 """
366 return self.istream_is_tty and self.ostream_is_tty
368 @property
369 def can_run_widgets(self) -> bool:
370 """
371 Return :data:`True` if input and output are interactive and colors
372 are supported. In this mode we can run interactive widgets.
374 """
376 return self.color_support >= ColorSupport.ANSI and self.is_tty
378 @staticmethod
379 def make_dummy(is_unicode: bool = True) -> Term:
380 """
381 Make a dummy terminal with closed streams and no capabilities.
383 """
385 stream = io.StringIO()
386 stream.close()
387 return Term(
388 istream=_ClosedIO(),
389 ostream=_ClosedIO(),
390 is_unicode=is_unicode,
391 )
394_CI_ENV_VARS = [
395 "TRAVIS",
396 "CIRCLECI",
397 "APPVEYOR",
398 "GITLAB_CI",
399 "BUILDKITE",
400 "DRONE",
401 "TEAMCITY_VERSION",
402 "GITHUB_ACTIONS",
403]
406def stream_is_unicode(stream: _t.TextIO, /) -> bool:
407 """
408 Determine of stream's encoding is some version of unicode.
410 """
412 encoding = getattr(stream, "encoding", None) or locale.getpreferredencoding() or ""
413 encoding = encoding.casefold()
414 return "utf" in encoding or "unicode" in encoding
417def get_tty_size(fallback: tuple[int, int] = (80, 24)):
418 """
419 Like :func:`shutil.get_terminal_size`, but uses TTY stream if it's available.
421 :param fallback:
422 tuple with width and height that will be used if query fails.
424 """
426 _prepare_tty()
428 try:
429 columns = int(os.environ["COLUMNS"])
430 except (KeyError, ValueError):
431 columns = 0
433 try:
434 lines = int(os.environ["LINES"])
435 except (KeyError, ValueError):
436 lines = 0
438 if columns <= 0 or lines <= 0:
439 try:
440 size = os.get_terminal_size(_TTY_OUTPUT.fileno()) # type: ignore
441 except (AttributeError, ValueError, OSError):
442 # stream is closed, detached, or not a terminal, or
443 # os.get_tty_size() is unsupported
444 size = os.terminal_size(fallback)
445 if columns <= 0:
446 columns = size.columns or fallback[0]
447 if lines <= 0:
448 lines = size.lines or fallback[1]
450 return os.terminal_size((columns, lines))
453def detect_ci() -> bool:
454 """
455 Scan environment variables to detect if we're in a known CI environment.
457 """
459 return "CI" in os.environ or any(ci in os.environ for ci in _CI_ENV_VARS)
462def detect_ci_color_support() -> ColorSupport:
463 """
464 Scan environment variables to detect an appropriate level of color support
465 of a CI environment.
467 If we're not in CI, return :attr:`ColorSupport.NONE <yuio.color.ColorSupport.NONE>`.
469 """
471 if "GITHUB_ACTIONS" in os.environ:
472 return ColorSupport.ANSI_TRUE
473 elif any(ci in os.environ for ci in _CI_ENV_VARS):
474 return ColorSupport.ANSI
475 else:
476 return ColorSupport.NONE
479def get_tty() -> Term:
480 """
481 Query info about TTY.
483 On Unix, this returns terminal connected to ``/dev/tty``. On Windows, this returns
484 terminal connected to ``CONIN$``/``CONOUT$``.
486 If opening any of these fails, returns :class:`Term` with ``stdin``/``stdout``
487 as a fallback.
489 .. note::
491 Prefer using ``stderr`` for normal IO: your users expect to be able to redirect
492 messages from your program.
494 Only use ``/dev/tty`` for querying passwords or other things that must not
495 be redirected.
497 """
499 _prepare_tty()
500 ostream = _TTY_OUTPUT or __stderr
501 istream = _TTY_INPUT or __stdin
502 assert ostream is not None
503 assert istream is not None
504 return get_term_from_stream(ostream, istream, allow_env_overrides=True)
507def get_term_from_stream(
508 ostream: _t.TextIO,
509 istream: _t.TextIO | None = None,
510 /,
511 *,
512 allow_env_overrides: bool = False,
513) -> Term:
514 """
515 Query info about a terminal attached to the given stream.
517 :param ostream:
518 output stream.
519 :param istream:
520 input stream. If not given, widgets will not work with this terminal.
521 :param allow_env_overrides:
522 honor environment variables and CLI flags when determining capabilities
523 of streams.
525 """
527 is_unicode = stream_is_unicode(ostream)
529 if (
530 # For building docs in github.
531 "YUIO_FORCE_FULL_TERM_SUPPORT" in os.environ and istream is not None
532 ): # pragma: no cover
533 return Term(
534 ostream=ostream,
535 istream=istream,
536 color_support=ColorSupport.ANSI_TRUE,
537 ostream_is_tty=True,
538 istream_is_tty=True,
539 is_unicode=is_unicode,
540 )
542 _prepare_tty()
544 output_is_tty = _output_is_tty(ostream)
545 input_is_tty = _input_is_tty(istream)
546 in_ci = detect_ci()
548 # Detect colors.
549 if output_is_tty or (_EXPLICIT_COLOR_SUPPORT is not None and allow_env_overrides):
550 color_support = _COLOR_SUPPORT
551 else:
552 color_support = ColorSupport.NONE
554 if istream is None:
555 istream = _ClosedIO()
557 return Term(
558 ostream=ostream,
559 istream=istream,
560 color_support=color_support,
561 ostream_is_tty=output_is_tty and not in_ci,
562 istream_is_tty=input_is_tty and not in_ci,
563 terminal_theme=_TERMINAL_THEME,
564 is_unicode=is_unicode,
565 )
568def _prepare_tty():
569 if not _TTY_SETUP_PERFORMED:
570 with _LOCK:
571 if not _TTY_SETUP_PERFORMED:
572 _do_prepare_tty()
575def _do_prepare_tty():
576 global \
577 _TTY_SETUP_PERFORMED, \
578 _TERMINAL_THEME, \
579 _EXPLICIT_COLOR_SUPPORT, \
580 _COLOR_SUPPORT
582 _find_tty()
584 _TTY_SETUP_PERFORMED = True
586 # Theme is `None` for now, will query it later.
587 _TERMINAL_THEME = None
589 # Find out if user specified `--color` or `FORCE_COLOR`.
590 _EXPLICIT_COLOR_SUPPORT = _detect_explicit_color_settings()
592 # Check user preferences.
593 if _EXPLICIT_COLOR_SUPPORT is False:
594 # Colors disabled, nothing more to do.
595 _COLOR_SUPPORT = ColorSupport.NONE
596 return
597 elif _EXPLICIT_COLOR_SUPPORT is True:
598 # At least ANSI. Might improve later.
599 _COLOR_SUPPORT = max(ColorSupport.ANSI, _detect_color_support_from_env())
600 elif _EXPLICIT_COLOR_SUPPORT is None:
601 # At least NONE. Might improve later.
602 _COLOR_SUPPORT = _detect_color_support_from_env()
603 else:
604 # Exact color support is given.
605 _COLOR_SUPPORT = _EXPLICIT_COLOR_SUPPORT
607 if _TTY_OUTPUT is None:
608 # Can't find attached TTY output, hence can't improve color support.
609 return
611 if os.name == "nt":
612 # Try enabling true colors.
613 if _enable_vt_processing(_TTY_OUTPUT):
614 # Success, can improve color support.
615 if _EXPLICIT_COLOR_SUPPORT is None or _EXPLICIT_COLOR_SUPPORT is True:
616 _COLOR_SUPPORT = ColorSupport.ANSI_TRUE
617 else:
618 # Failure, this version of Windows does not support colors.
619 return
621 if _TTY_INPUT is None:
622 # Can't find attached TTY input, hence can't improve color support.
623 return
625 if not _is_foreground(_TTY_OUTPUT) or not _is_foreground(_TTY_INPUT):
626 # We're not a foreground process, we won't be able to fetch colors.
627 return
628 if detect_ci():
629 # We're in CI, we won't be able to fetch colors.
630 return
631 if not _is_tty(__stdin):
632 # We don't want to query colors if our stdin is redirected: this is a sign
633 # that this program runs in some sort of a pipeline, and multiple instances
634 # of it might run at the same time. If this happens, several processes/threads
635 # can interact with the same TTY, leading to garbled output.
636 return
638 if _COLOR_SUPPORT >= ColorSupport.ANSI:
639 # We were able to find TTY, and colors are supported.
640 # Try fetching terminal theme.
641 _TERMINAL_THEME = _get_standard_colors(_TTY_OUTPUT, _TTY_INPUT)
644def _find_tty():
645 global _TTY_OUTPUT, _TTY_INPUT
647 _TTY_OUTPUT = _TTY_INPUT = None
649 closer = contextlib.ExitStack()
650 try:
651 if os.name == "nt":
652 file_io_in = io._WindowsConsoleIO("CONIN$", "r") # type: ignore
653 tty_in = closer.enter_context(
654 io.TextIOWrapper(file_io_in, encoding="utf-8")
655 )
656 file_io_out = io._WindowsConsoleIO("CONOUT$", "w") # type: ignore
657 tty_out = closer.enter_context(
658 io.TextIOWrapper(file_io_out, encoding="utf-8")
659 )
660 else:
661 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
662 file_io = io.FileIO(fd, "w+")
663 file_io.name = "/dev/tty"
664 tty_in = tty_out = closer.enter_context(io.TextIOWrapper(file_io))
665 except (OSError, AttributeError):
666 closer.close()
667 except:
668 closer.close()
669 raise
670 else:
671 atexit.register(closer.close)
672 _TTY_INPUT = tty_in
673 _TTY_OUTPUT = tty_out
674 return
676 for stream in (__stderr, __stdout):
677 if stream is not None and _output_is_tty(stream):
678 _TTY_OUTPUT = stream
679 break
680 if __stdin is not None and _input_is_tty(__stdin):
681 _TTY_INPUT = __stdin
684def _get_standard_colors(
685 ostream: _t.TextIO, istream: _t.TextIO
686) -> TerminalTheme | None:
687 if "YUIO_DISABLE_OSC_QUERIES" in os.environ:
688 return None
690 try:
691 query = "\x1b]10;?\x1b\\\x1b]11;?\x1b\\" + "".join(
692 [f"\x1b]4;{i};?\x1b\\" for i in range(16)]
693 )
694 response = _query_term(ostream, istream, query)
695 if not response:
696 return None
698 # Deal with foreground color.
700 match = re.match(
701 r"^\x1b]10;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
702 response,
703 re.IGNORECASE,
704 )
705 if match is None: # pragma: no cover
706 return None
708 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
709 foreground = yuio.color.ColorValue.from_rgb(r, g, b)
711 response = response[match.end() :]
713 # Deal with background color.
715 match = re.match(
716 r"^\x1b]11;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
717 response,
718 re.IGNORECASE,
719 )
720 if match is None: # pragma: no cover
721 return None
723 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
724 background = yuio.color.ColorValue.from_rgb(r, g, b)
725 luma = (0.2627 * r + 0.6780 * g + 0.0593 * b) / 256
727 if luma <= 0.2:
728 lightness = Lightness.DARK
729 elif luma >= 0.85:
730 lightness = Lightness.LIGHT
731 else:
732 lightness = Lightness.UNKNOWN
734 response = response[match.end() :]
736 # Deal with other colors
738 colors = {}
740 while response:
741 match = re.match(
742 r"^\x1b]4;(\d+);rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
743 response,
744 re.IGNORECASE,
745 )
746 if match is None: # pragma: no cover
747 return None
749 c = int(match.group(1))
750 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()[1:])
751 colors[c] = yuio.color.ColorValue.from_rgb(r, g, b)
753 response = response[match.end() :]
755 for k in range(8):
756 if k not in colors:
757 return None
759 # return colors
760 return TerminalTheme(
761 background=background,
762 foreground=foreground,
763 black=colors[0],
764 bright_black=colors.get(8 + 0, colors[0]),
765 red=colors[1],
766 bright_red=colors.get(8 + 1, colors[1]),
767 green=colors[2],
768 bright_green=colors.get(8 + 2, colors[2]),
769 yellow=colors[3],
770 bright_yellow=colors.get(8 + 3, colors[3]),
771 blue=colors[4],
772 bright_blue=colors.get(8 + 4, colors[4]),
773 magenta=colors[5],
774 bright_magenta=colors.get(8 + 5, colors[5]),
775 cyan=colors[6],
776 bright_cyan=colors.get(8 + 6, colors[6]),
777 white=colors[7],
778 bright_white=colors.get(8 + 7, colors[7]),
779 lightness=lightness,
780 )
782 except Exception: # pragma: no cover
783 return None
786def _query_term(ostream: _t.TextIO, istream: _t.TextIO, query: str) -> str | None:
787 try:
788 # Lock the keyboard.
789 ostream.write("\x1b[2h")
790 ostream.flush()
792 with _enter_raw_mode(ostream, istream):
793 _flush_input_buffer(ostream, istream)
795 # It is important that we unlock keyboard before exiting `cbreak`,
796 # hence the nested `try`.
797 try:
798 # Append a DA1 query, as virtually all terminals support it.
799 ostream.write(query + "\x1b[c")
800 ostream.flush()
802 buf = _read_keycode(ostream, istream, timeout=0.250)
803 if not buf.startswith("\x1b"):
804 yuio._logger.warning("_query_term invalid response: %r", buf)
805 return None
807 # Read till we find a DA1 response.
808 while not re.search(r"\x1b\[\?.*?c", buf):
809 buf += _read_keycode(ostream, istream, timeout=0.250)
811 return buf[: buf.index("\x1b[?")]
812 finally:
813 _flush_input_buffer(ostream, istream)
815 # Release the keyboard.
816 ostream.write("\x1b[2i")
817 ostream.flush()
818 except Exception: # pragma: no cover
819 yuio._logger.warning("_query_term error", exc_info=True)
820 return None
821 finally:
822 # Release the keyboard.
823 ostream.write("\x1b[2i")
824 ostream.flush()
827def _detect_explicit_color_settings() -> ColorSupport | bool | None:
828 color_support = None
830 if "FORCE_COLOR" in os.environ:
831 color_support = True
833 if "NO_COLOR" in os.environ or "FORCE_NO_COLOR" in os.environ:
834 color_support = False
836 # Note: we don't rely on argparse to parse flags and send them to us
837 # because these functions can be called before parsing arguments.
838 for arg in sys.argv[1:]:
839 if arg in ("--color", "--force-color"):
840 color_support = True
841 elif arg in ("--no-color", "--force-no-color"):
842 color_support = False
843 elif arg.startswith(("--color=", "--colors=")):
844 value = (
845 arg.split("=", maxsplit=1)[1]
846 .replace("_", "")
847 .replace("-", "")
848 .casefold()
849 )
850 if value in ["1", "yes", "true"]:
851 color_support = True
852 elif value in ["0", "no", "false"]:
853 color_support = False
854 elif value == "ansi":
855 color_support = ColorSupport.ANSI
856 elif value == "ansi256":
857 color_support = ColorSupport.ANSI_256
858 elif value == "ansitrue":
859 color_support = ColorSupport.ANSI_TRUE
861 return color_support
864def _detect_color_support_from_env() -> ColorSupport:
865 term = os.environ.get("TERM", "").lower()
866 colorterm = os.environ.get("COLORTERM", "").lower()
868 if detect_ci():
869 return detect_ci_color_support()
870 elif os.name == "nt":
871 return ColorSupport.NONE
872 elif colorterm in ("truecolor", "24bit") or term == "xterm-kitty":
873 return ColorSupport.ANSI_TRUE
874 elif colorterm in ("yes", "true") or "256color" in term or term == "screen":
875 if os.name == "posix" and term == "xterm-256color" and shutil.which("wslinfo"):
876 return ColorSupport.ANSI_TRUE
877 else:
878 return ColorSupport.ANSI_256
879 elif "linux" in term or "color" in term or "ansi" in term or "xterm" in term:
880 return ColorSupport.ANSI
882 return ColorSupport.NONE
885def _is_tty(stream: _t.TextIO | None) -> bool:
886 try:
887 return stream is not None and stream.isatty()
888 except Exception: # pragma: no cover
889 return False
892def _input_is_tty(stream: _t.TextIO | None) -> bool:
893 try:
894 return stream is not None and _is_tty(stream) and stream.readable()
895 except Exception: # pragma: no cover
896 return False
899def _output_is_tty(stream: _t.TextIO | None) -> bool:
900 try:
901 return stream is not None and _is_tty(stream) and stream.writable()
902 except Exception: # pragma: no cover
903 return False
906@contextlib.contextmanager
907def _modify_keyboard(
908 ostream: _t.TextIO,
909 bracketed_paste: bool = False,
910 modify_keyboard: bool = False,
911):
912 prologue = []
913 if bracketed_paste:
914 prologue.append("\x1b[?2004h")
915 if modify_keyboard:
916 prologue.append("\x1b[>1u")
917 if prologue:
918 ostream.write("".join(prologue))
919 ostream.flush()
920 try:
921 yield
922 finally:
923 epilog = []
924 if bracketed_paste:
925 epilog.append("\x1b[?2004l")
926 epilog.append("\x1b[<u")
927 if epilog:
928 ostream.write("".join(epilog))
929 ostream.flush()
932# Platform-specific code for working with terminals.
933if os.name == "posix":
934 import select
935 import signal
936 import termios
937 import tty
939 def _is_foreground(stream: _t.TextIO | None) -> bool:
940 try:
941 return stream is not None and os.getpgrp() == os.tcgetpgrp(stream.fileno())
942 except Exception: # pragma: no cover
943 return False
945 @contextlib.contextmanager
946 def _enter_raw_mode(
947 ostream: _t.TextIO,
948 istream: _t.TextIO,
949 bracketed_paste: bool = False,
950 modify_keyboard: bool = False,
951 ):
952 prev_mode = termios.tcgetattr(istream)
953 new_mode = prev_mode.copy()
954 new_mode[tty.LFLAG] &= ~(
955 termios.ECHO # Don't print back what user types.
956 | termios.ICANON # Disable line editing.
957 | termios.ISIG # Disable signals on C-c and C-z.
958 )
959 new_mode[tty.CC] = new_mode[tty.CC].copy()
960 new_mode[tty.CC][termios.VMIN] = 1
961 new_mode[tty.CC][termios.VTIME] = 0
962 termios.tcsetattr(istream, termios.TCSAFLUSH, new_mode)
964 try:
965 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
966 yield
967 finally:
968 termios.tcsetattr(istream, termios.TCSAFLUSH, prev_mode)
970 def _read_keycode(
971 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
972 ) -> str:
973 if timeout and not bool(select.select([istream], [], [], timeout)[0]):
974 raise TimeoutError()
975 key = os.read(istream.fileno(), 128)
976 while bool(select.select([istream], [], [], 0)[0]):
977 key += os.read(istream.fileno(), 128)
979 return key.decode(istream.encoding, errors="replace")
981 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
982 pass
984 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
985 return False # This is a windows functionality
987 def _pause():
988 os.kill(os.getpid(), signal.SIGTSTP)
990elif os.name == "nt":
991 import ctypes
992 import ctypes.wintypes
993 import msvcrt
995 _FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer
996 _FlushConsoleInputBuffer.argtypes = [ctypes.wintypes.HANDLE]
997 _FlushConsoleInputBuffer.restype = ctypes.wintypes.BOOL
999 _GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
1000 _GetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD]
1001 _GetConsoleMode.restype = ctypes.wintypes.BOOL
1003 _SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
1004 _SetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD]
1005 _SetConsoleMode.restype = ctypes.wintypes.BOOL
1007 _GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow
1008 _GetConsoleWindow.argtypes = []
1009 _GetConsoleWindow.restype = ctypes.wintypes.HWND
1011 _IsWindowVisible = ctypes.windll.user32.IsWindowVisible
1012 _IsWindowVisible.argtypes = [ctypes.wintypes.HWND]
1013 _SetConsoleMode.restype = ctypes.wintypes.BOOL
1015 _ReadConsoleW = ctypes.windll.kernel32.ReadConsoleW
1016 _ReadConsoleW.argtypes = [
1017 ctypes.wintypes.HANDLE,
1018 ctypes.wintypes.LPVOID,
1019 ctypes.wintypes.DWORD,
1020 ctypes.wintypes.LPDWORD,
1021 ctypes.wintypes.LPVOID,
1022 ]
1023 _ReadConsoleW.restype = ctypes.wintypes.BOOL
1025 _ENABLE_PROCESSED_OUTPUT = 0x0001
1026 _ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002
1027 _ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
1028 _ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
1030 _ISTREAM_HANDLE = None
1032 def _is_foreground(stream: _t.TextIO | None) -> bool:
1033 window = _GetConsoleWindow()
1034 if not window:
1035 return False
1036 return _IsWindowVisible(window)
1038 @contextlib.contextmanager
1039 def _enter_raw_mode(
1040 ostream: _t.TextIO,
1041 istream: _t.TextIO,
1042 bracketed_paste: bool = False,
1043 modify_keyboard: bool = False,
1044 ):
1045 global _ISTREAM_HANDLE
1047 if _ISTREAM_HANDLE is None:
1048 _prepare_tty()
1049 _ISTREAM_HANDLE = msvcrt.get_osfhandle((_TTY_INPUT or __stdin).fileno()) # type: ignore
1051 mode = ctypes.wintypes.DWORD()
1052 success = _GetConsoleMode(_ISTREAM_HANDLE, ctypes.byref(mode))
1053 if not success:
1054 raise ctypes.WinError()
1055 success = _SetConsoleMode(_ISTREAM_HANDLE, _ENABLE_VIRTUAL_TERMINAL_INPUT)
1056 if not success:
1057 raise ctypes.WinError()
1059 try:
1060 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
1061 yield
1062 finally:
1063 success = _SetConsoleMode(_ISTREAM_HANDLE, mode)
1064 if not success:
1065 raise ctypes.WinError()
1067 def _read_keycode(
1068 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1069 ) -> str:
1070 assert _ISTREAM_HANDLE is not None
1072 CHAR16 = ctypes.wintypes.WCHAR * 16
1074 n_read = ctypes.wintypes.DWORD()
1075 buffer = CHAR16()
1077 success = _ReadConsoleW(
1078 _ISTREAM_HANDLE,
1079 ctypes.byref(buffer),
1080 16,
1081 ctypes.byref(n_read),
1082 0,
1083 )
1084 if not success:
1085 raise ctypes.WinError()
1087 return buffer.value
1089 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1090 assert _ISTREAM_HANDLE is not None
1092 success = _FlushConsoleInputBuffer(_ISTREAM_HANDLE)
1093 if not success:
1094 raise ctypes.WinError()
1096 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1097 try:
1098 version = sys.getwindowsversion()
1099 if version.major < 10 or version.build < 14931:
1100 return False
1102 handle = msvcrt.get_osfhandle(ostream.fileno())
1103 return bool(
1104 _SetConsoleMode(
1105 handle,
1106 _ENABLE_PROCESSED_OUTPUT
1107 | _ENABLE_WRAP_AT_EOL_OUTPUT
1108 | _ENABLE_VIRTUAL_TERMINAL_PROCESSING,
1109 )
1110 )
1111 except Exception: # pragma: no cover
1112 return False
1114 def _pause():
1115 pass
1117else: # pragma: no cover
1119 def _is_foreground(stream: _t.TextIO | None) -> bool:
1120 return False
1122 @contextlib.contextmanager
1123 def _enter_raw_mode(
1124 ostream: _t.TextIO,
1125 istream: _t.TextIO,
1126 bracketed_paste: bool = False,
1127 modify_keyboard: bool = False,
1128 ):
1129 raise OSError("not supported")
1130 yield
1132 def _read_keycode(
1133 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1134 ) -> str:
1135 raise OSError("not supported")
1137 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1138 raise OSError("not supported")
1140 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1141 raise OSError("not supported")
1143 def _pause():
1144 raise OSError("not supported")