Coverage for yuio / term.py: 70%
450 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9Querying terminal info and working with ANSI terminals.
11This is a low-level module upon which :mod:`yuio.io` builds
12its higher-level abstraction.
15Detecting terminal capabilities
16-------------------------------
18Terminal capabilities are stored in a :class:`Term` object.
20Usually, you don't need to query terminal capabilities yourself,
21as you can use Yuio's global configuration from :mod:`yuio.io`
22(see :func:`yuio.io.get_term`).
24However, you can get a :class:`Term` object by using :func:`get_term_from_stream`:
26.. autofunction:: get_term_from_stream
28.. autofunction:: get_tty
30.. autoclass:: Term
31 :members:
33.. autoclass:: TerminalTheme
34 :members:
36.. autoclass:: Lightness
37 :members:
40Utilities
41---------
43.. autofunction:: stream_is_unicode
45.. autofunction:: get_tty_size
47.. autofunction:: detect_ci
49.. autofunction:: detect_ci_color_support
52Re-imports
53----------
55.. type:: ColorSupport
56 :no-index:
58 Alias of :obj:`yuio.color.ColorSupport`.
60"""
62from __future__ import annotations
64import atexit
65import contextlib
66import dataclasses
67import enum
68import io
69import locale
70import os
71import re
72import shutil
73import sys
74import threading
75from dataclasses import dataclass
77import yuio
78import yuio.color
79from yuio.color import ColorSupport
81from typing import TYPE_CHECKING
83if TYPE_CHECKING:
84 import typing_extensions as _t
85else:
86 from yuio import _typing as _t
88__all__ = [
89 "ColorSupport",
90 "Lightness",
91 "Term",
92 "TerminalTheme",
93 "detect_ci",
94 "detect_ci_color_support",
95 "get_term_from_stream",
96 "get_tty",
97 "get_tty_size",
98 "stream_is_unicode",
99]
101T = _t.TypeVar("T")
104_LOCK = threading.Lock()
106# These variables are set in `_prepare_tty`.
107_TTY_SETUP_PERFORMED: bool = False
108_TTY_OUTPUT: _t.TextIO | None
109_TTY_INPUT: _t.TextIO | None
110_TERMINAL_THEME: TerminalTheme | None
111_EXPLICIT_COLOR_SUPPORT: ColorSupport | bool | None
112_COLOR_SUPPORT: ColorSupport
115# Redefine canonical streams so that we don't monkeypatch `sys.__std*__` in tests.
116__stdin = sys.__stdin__
117__stdout = sys.__stdout__
118__stderr = sys.__stderr__
121class Lightness(enum.Enum):
122 """
123 Overall color theme of a terminal.
125 Can help with deciding which colors to use when printing output.
127 """
129 UNKNOWN = enum.auto()
130 """
131 We couldn't determine terminal background, or it wasn't dark
132 or bright enough to fall in one category or another.
134 """
136 DARK = enum.auto()
137 """
138 Terminal background is dark.
140 """
142 LIGHT = enum.auto()
143 """
144 Terminal background is light.
146 """
149@dataclass(frozen=True, slots=True)
150class TerminalTheme:
151 """
152 Colors and theme of the attached terminal.
154 """
156 background: yuio.color.ColorValue
157 """
158 Background color of a terminal.
160 """
162 foreground: yuio.color.ColorValue
163 """
164 Foreground color of a terminal.
166 """
168 black: yuio.color.ColorValue
169 """
170 Color value for the default "black" color.
172 """
174 bright_black: yuio.color.ColorValue
175 """
176 Color value for the default "bright black" color.
178 """
180 red: yuio.color.ColorValue
181 """
182 Color value for the default "red" color.
184 """
186 bright_red: yuio.color.ColorValue
187 """
188 Color value for the default "bright red" color.
190 """
192 green: yuio.color.ColorValue
193 """
194 Color value for the default "green" color.
196 """
198 bright_green: yuio.color.ColorValue
199 """
200 Color value for the default "bright green" color.
202 """
204 yellow: yuio.color.ColorValue
205 """
206 Color value for the default "yellow" color.
208 """
210 bright_yellow: yuio.color.ColorValue
211 """
212 Color value for the default "bright yellow" color.
214 """
216 blue: yuio.color.ColorValue
217 """
218 Color value for the default "blue" color.
220 """
222 bright_blue: yuio.color.ColorValue
223 """
224 Color value for the default "bright blue" color.
226 """
228 magenta: yuio.color.ColorValue
229 """
230 Color value for the default "magenta" color.
232 """
234 bright_magenta: yuio.color.ColorValue
235 """
236 Color value for the default "bright magenta" color.
238 """
240 cyan: yuio.color.ColorValue
241 """
242 Color value for the default "cyan" color.
244 """
246 bright_cyan: yuio.color.ColorValue
247 """
248 Color value for the default "bright cyan" color.
250 """
252 white: yuio.color.ColorValue
253 """
254 Color value for the default "white" color.
256 """
258 bright_white: yuio.color.ColorValue
259 """
260 Color value for the default "bright white" color.
262 """
264 lightness: Lightness
265 """
266 Overall color theme of a terminal, i.e. dark or light.
268 """
271@dataclass(frozen=True, slots=True)
272class Term:
273 """
274 This class contains all info about what kinds of things the terminal
275 supports. If available, it will also have info about terminal's theme,
276 i.e. dark or light background, etc.
278 """
280 ostream: _t.TextIO
281 """
282 Terminal's output stream.
284 """
286 istream: _t.TextIO
287 """
288 Terminal's input stream.
290 """
292 color_support: ColorSupport = dataclasses.field(
293 default=ColorSupport.NONE, kw_only=True
294 )
295 """
296 Terminal's capability for coloring output.
298 """
300 ostream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
301 """
302 Output is connecter to a terminal, and we're not in CI.
304 Note that output being connected to a TTY doesn't mean that it's interactive:
305 this process can be in background.
307 """
309 istream_is_tty: bool = dataclasses.field(default=False, kw_only=True)
310 """
311 Output is connecter to a terminal, and we're not in CI.
313 Note that output being connected to a TTY doesn't mean that it's interactive:
314 this process can be in background.
316 """
318 terminal_theme: TerminalTheme | None = dataclasses.field(default=None, kw_only=True)
319 """
320 Terminal's default foreground, background, and text colors.
322 """
324 is_unicode: bool = dataclasses.field(default=False, kw_only=True)
325 """
326 Terminal's output supports unicode characters.
328 """
330 @property
331 def supports_colors(self) -> bool:
332 """
333 Return :data:`True` if terminal supports simple 8-bit color codes.
335 """
337 return self.color_support >= ColorSupport.ANSI
339 @property
340 def supports_colors_256(self) -> bool:
341 """
342 Return :data:`True` if terminal supports 256-encoded colors.
344 """
346 return self.color_support >= ColorSupport.ANSI_256
348 @property
349 def supports_colors_true(self) -> bool:
350 """
351 Return :data:`True` if terminal supports true colors.
353 """
355 return self.color_support >= ColorSupport.ANSI_TRUE
357 @property
358 def is_tty(self) -> bool:
359 """
360 Return :data:`True` if input and output are connected to a TTY. In this mode
361 we can interact with the user by writing and reading lines of text.
363 """
365 return self.istream_is_tty and self.ostream_is_tty
367 @property
368 def can_run_widgets(self) -> bool:
369 """
370 Return :data:`True` if input and output are interactive and colors
371 are supported. In this mode we can run interactive widgets.
373 """
375 return self.color_support >= ColorSupport.ANSI and self.is_tty
378_CI_ENV_VARS = [
379 "TRAVIS",
380 "CIRCLECI",
381 "APPVEYOR",
382 "GITLAB_CI",
383 "BUILDKITE",
384 "DRONE",
385 "TEAMCITY_VERSION",
386 "GITHUB_ACTIONS",
387]
390def stream_is_unicode(stream: _t.TextIO, /) -> bool:
391 """
392 Determine of stream's encoding is some version of unicode.
394 """
396 encoding = getattr(stream, "encoding", None) or locale.getpreferredencoding() or ""
397 encoding = encoding.casefold()
398 return "utf" in encoding or "unicode" in encoding
401def get_tty_size(fallback: tuple[int, int] = (80, 24)):
402 """
403 Like :func:`shutil.get_terminal_size`, but uses TTY stream if it's available.
405 :param fallback:
406 tuple with width and height that will be used if query fails.
408 """
410 _prepare_tty()
412 try:
413 columns = int(os.environ["COLUMNS"])
414 except (KeyError, ValueError):
415 columns = 0
417 try:
418 lines = int(os.environ["LINES"])
419 except (KeyError, ValueError):
420 lines = 0
422 if columns <= 0 or lines <= 0:
423 try:
424 size = os.get_terminal_size(_TTY_OUTPUT.fileno()) # type: ignore
425 except (AttributeError, ValueError, OSError):
426 # stream is closed, detached, or not a terminal, or
427 # os.get_tty_size() is unsupported
428 size = os.terminal_size(fallback)
429 if columns <= 0:
430 columns = size.columns or fallback[0]
431 if lines <= 0:
432 lines = size.lines or fallback[1]
434 return os.terminal_size((columns, lines))
437def detect_ci() -> bool:
438 """
439 Scan environment variables to detect if we're in a known CI environment.
441 """
443 return "CI" in os.environ or any(ci in os.environ for ci in _CI_ENV_VARS)
446def detect_ci_color_support() -> ColorSupport:
447 """
448 Scan environment variables to detect an appropriate level of color support
449 of a CI environment.
451 If we're not in CI, return :attr:`ColorSupport.NONE <yuio.color.ColorSupport.NONE>`.
453 """
455 if "GITHUB_ACTIONS" in os.environ:
456 return ColorSupport.ANSI_TRUE
457 elif any(ci in os.environ for ci in _CI_ENV_VARS):
458 return ColorSupport.ANSI
459 else:
460 return ColorSupport.NONE
463def get_tty() -> Term:
464 """
465 Query info about TTY.
467 On Unix, this returns terminal connected to ``/dev/tty``. On Windows, this returns
468 terminal connected to ``CONIN$``/``CONOUT$``.
470 If opening any of these fails, returns :class:`Term` with ``stdin``/``stdout``
471 as a fallback.
473 .. note::
475 Prefer using ``stderr`` for normal IO: your users expect to be able to redirect
476 messages from your program.
478 Only use ``/dev/tty`` for querying passwords or other things that must not
479 be redirected.
481 """
483 _prepare_tty()
484 ostream = _TTY_OUTPUT or __stderr
485 istream = _TTY_INPUT or __stdin
486 assert ostream is not None
487 assert istream is not None
488 return get_term_from_stream(ostream, istream)
491def get_term_from_stream(
492 ostream: _t.TextIO,
493 istream: _t.TextIO,
494 /,
495) -> Term:
496 """
497 Query info about a terminal attached to the given stream.
499 :param ostream:
500 output stream.
501 :param istream:
502 input stream.
504 """
506 is_unicode = stream_is_unicode(ostream)
508 if (
509 # For building docs in github.
510 "YUIO_FORCE_FULL_TERM_SUPPORT" in os.environ
511 ): # pragma: no cover
512 return Term(
513 ostream=ostream,
514 istream=istream,
515 color_support=ColorSupport.ANSI_TRUE,
516 ostream_is_tty=True,
517 istream_is_tty=True,
518 is_unicode=is_unicode,
519 )
521 _prepare_tty()
523 output_is_tty = _output_is_tty(ostream)
524 input_is_tty = _input_is_tty(istream)
525 in_ci = detect_ci()
527 # Detect colors.
528 if output_is_tty or _EXPLICIT_COLOR_SUPPORT is not None:
529 color_support = _COLOR_SUPPORT
530 else:
531 color_support = ColorSupport.NONE
533 return Term(
534 ostream=ostream,
535 istream=istream,
536 color_support=color_support,
537 ostream_is_tty=output_is_tty and not in_ci,
538 istream_is_tty=input_is_tty and not in_ci,
539 terminal_theme=_TERMINAL_THEME,
540 is_unicode=is_unicode,
541 )
544def _prepare_tty():
545 if not _TTY_SETUP_PERFORMED:
546 with _LOCK:
547 if not _TTY_SETUP_PERFORMED:
548 _do_prepare_tty()
551def _do_prepare_tty():
552 global \
553 _TTY_SETUP_PERFORMED, \
554 _TERMINAL_THEME, \
555 _EXPLICIT_COLOR_SUPPORT, \
556 _COLOR_SUPPORT
558 _find_tty()
560 _TTY_SETUP_PERFORMED = True
562 # Theme is `None` for now, will query it later.
563 _TERMINAL_THEME = None
565 # Find out if user specified `--color` or `FORCE_COLOR`.
566 _EXPLICIT_COLOR_SUPPORT = _detect_explicit_color_settings()
568 # Check user preferences.
569 if _EXPLICIT_COLOR_SUPPORT is False:
570 # Colors disabled, nothing more to do.
571 _COLOR_SUPPORT = ColorSupport.NONE
572 return
573 elif _EXPLICIT_COLOR_SUPPORT is True:
574 # At least ANSI. Might improve later.
575 _COLOR_SUPPORT = max(ColorSupport.ANSI, _detect_color_support_from_env())
576 elif _EXPLICIT_COLOR_SUPPORT is None:
577 # At least NONE. Might improve later.
578 _COLOR_SUPPORT = _detect_color_support_from_env()
579 else:
580 # Exact color support is given.
581 _COLOR_SUPPORT = _EXPLICIT_COLOR_SUPPORT
583 if _TTY_OUTPUT is None:
584 # Can't find attached TTY output, hence can't improve color support.
585 return
587 if os.name == "nt":
588 # Try enabling true colors.
589 if _enable_vt_processing(_TTY_OUTPUT):
590 # Success, can improve color support.
591 if _EXPLICIT_COLOR_SUPPORT is None or _EXPLICIT_COLOR_SUPPORT is True:
592 _COLOR_SUPPORT = ColorSupport.ANSI_TRUE
593 else:
594 # Failure, this version of Windows does not support colors.
595 return
597 if _TTY_INPUT is None:
598 # Can't find attached TTY input, hence can't improve color support.
599 return
601 if not _is_foreground(_TTY_OUTPUT) or not _is_foreground(_TTY_INPUT):
602 # We're not a foreground process, we won't be able to fetch colors.
603 return
604 if detect_ci():
605 # We're in CI, we won't be able to fetch colors.
606 return
607 if not _is_tty(__stdin):
608 # We don't want to query colors if our stdin is redirected: this is a sign
609 # that this program runs in some sort of a pipeline, and multiple instances
610 # of it might run at the same time. If this happens, several processes/threads
611 # can interact with the same TTY, leading to garbled output.
612 return
614 if _COLOR_SUPPORT >= ColorSupport.ANSI:
615 # We were able to find TTY, and colors are supported.
616 # Try fetching terminal theme.
617 _TERMINAL_THEME = _get_standard_colors(_TTY_OUTPUT, _TTY_INPUT)
620def _find_tty():
621 global _TTY_OUTPUT, _TTY_INPUT
623 _TTY_OUTPUT = _TTY_INPUT = None
625 closer = contextlib.ExitStack()
626 try:
627 if os.name == "nt":
628 file_io_in = io._WindowsConsoleIO("CONIN$", "r") # type: ignore
629 tty_in = closer.enter_context(
630 io.TextIOWrapper(file_io_in, encoding="utf-8")
631 )
632 file_io_out = io._WindowsConsoleIO("CONOUT$", "w") # type: ignore
633 tty_out = closer.enter_context(
634 io.TextIOWrapper(file_io_out, encoding="utf-8")
635 )
636 else:
637 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
638 file_io = io.FileIO(fd, "w+")
639 file_io.name = "/dev/tty"
640 tty_in = tty_out = closer.enter_context(io.TextIOWrapper(file_io))
641 except (OSError, AttributeError):
642 closer.close()
643 except:
644 closer.close()
645 raise
646 else:
647 atexit.register(closer.close)
648 _TTY_INPUT = tty_in
649 _TTY_OUTPUT = tty_out
650 return
652 for stream in (__stderr, __stdout):
653 if stream is not None and _output_is_tty(stream):
654 _TTY_OUTPUT = stream
655 break
656 if __stdin is not None and _input_is_tty(__stdin):
657 _TTY_INPUT = __stdin
660def _get_standard_colors(
661 ostream: _t.TextIO, istream: _t.TextIO
662) -> TerminalTheme | None:
663 if "YUIO_DISABLE_OSC_QUERIES" in os.environ:
664 return None
666 try:
667 query = "\x1b]10;?\x1b\\\x1b]11;?\x1b\\" + "".join(
668 [f"\x1b]4;{i};?\x1b\\" for i in range(16)]
669 )
670 response = _query_term(ostream, istream, query)
671 if not response:
672 return None
674 # Deal with foreground color.
676 match = re.match(
677 r"^\x1b]10;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
678 response,
679 re.IGNORECASE,
680 )
681 if match is None: # pragma: no cover
682 return None
684 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
685 foreground = yuio.color.ColorValue.from_rgb(r, g, b)
687 response = response[match.end() :]
689 # Deal with background color.
691 match = re.match(
692 r"^\x1b]11;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
693 response,
694 re.IGNORECASE,
695 )
696 if match is None: # pragma: no cover
697 return None
699 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups())
700 background = yuio.color.ColorValue.from_rgb(r, g, b)
701 luma = (0.2627 * r + 0.6780 * g + 0.0593 * b) / 256
703 if luma <= 0.2:
704 lightness = Lightness.DARK
705 elif luma >= 0.85:
706 lightness = Lightness.LIGHT
707 else:
708 lightness = Lightness.UNKNOWN
710 response = response[match.end() :]
712 # Deal with other colors
714 colors = {}
716 while response:
717 match = re.match(
718 r"^\x1b]4;(\d+);rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)",
719 response,
720 re.IGNORECASE,
721 )
722 if match is None: # pragma: no cover
723 return None
725 c = int(match.group(1))
726 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()[1:])
727 colors[c] = yuio.color.ColorValue.from_rgb(r, g, b)
729 response = response[match.end() :]
731 for k in range(8):
732 if k not in colors:
733 return None
735 # return colors
736 return TerminalTheme(
737 background=background,
738 foreground=foreground,
739 black=colors[0],
740 bright_black=colors.get(8 + 0, colors[0]),
741 red=colors[1],
742 bright_red=colors.get(8 + 1, colors[1]),
743 green=colors[2],
744 bright_green=colors.get(8 + 2, colors[2]),
745 yellow=colors[3],
746 bright_yellow=colors.get(8 + 3, colors[3]),
747 blue=colors[4],
748 bright_blue=colors.get(8 + 4, colors[4]),
749 magenta=colors[5],
750 bright_magenta=colors.get(8 + 5, colors[5]),
751 cyan=colors[6],
752 bright_cyan=colors.get(8 + 6, colors[6]),
753 white=colors[7],
754 bright_white=colors.get(8 + 7, colors[7]),
755 lightness=lightness,
756 )
758 except Exception: # pragma: no cover
759 return None
762def _query_term(ostream: _t.TextIO, istream: _t.TextIO, query: str) -> str | None:
763 try:
764 # Lock the keyboard.
765 ostream.write("\x1b[2h")
766 ostream.flush()
768 with _enter_raw_mode(ostream, istream):
769 _flush_input_buffer(ostream, istream)
771 # It is important that we unlock keyboard before exiting `cbreak`,
772 # hence the nested `try`.
773 try:
774 # Append a DA1 query, as virtually all terminals support it.
775 ostream.write(query + "\x1b[c")
776 ostream.flush()
778 buf = _read_keycode(ostream, istream, timeout=0.250)
779 if not buf.startswith("\x1b"):
780 yuio._logger.warning("_query_term invalid response: %r", buf)
781 return None
783 # Read till we find a DA1 response.
784 while not re.search(r"\x1b\[\?.*?c", buf):
785 buf += _read_keycode(ostream, istream, timeout=0.250)
787 return buf[: buf.index("\x1b[?")]
788 finally:
789 _flush_input_buffer(ostream, istream)
791 # Release the keyboard.
792 ostream.write("\x1b[2i")
793 ostream.flush()
794 except Exception: # pragma: no cover
795 yuio._logger.warning("_query_term error", exc_info=True)
796 return None
797 finally:
798 # Release the keyboard.
799 ostream.write("\x1b[2i")
800 ostream.flush()
803def _detect_explicit_color_settings() -> ColorSupport | bool | None:
804 color_support = None
806 if "FORCE_COLOR" in os.environ:
807 color_support = True
809 if "NO_COLOR" in os.environ or "FORCE_NO_COLOR" in os.environ:
810 color_support = False
812 # Note: we don't rely on argparse to parse flags and send them to us
813 # because these functions can be called before parsing arguments.
814 for arg in sys.argv[1:]:
815 if arg in ("--color", "--force-color"):
816 color_support = True
817 elif arg in ("--no-color", "--force-no-color"):
818 color_support = False
819 elif arg.startswith(("--color=", "--colors=")):
820 value = (
821 arg.split("=", maxsplit=1)[1]
822 .replace("_", "")
823 .replace("-", "")
824 .casefold()
825 )
826 if value in ["1", "yes", "true"]:
827 color_support = True
828 elif value in ["0", "no", "false"]:
829 color_support = False
830 elif value == "ansi":
831 color_support = ColorSupport.ANSI
832 elif value == "ansi256":
833 color_support = ColorSupport.ANSI_256
834 elif value == "ansitrue":
835 color_support = ColorSupport.ANSI_TRUE
837 return color_support
840def _detect_color_support_from_env() -> ColorSupport:
841 term = os.environ.get("TERM", "").lower()
842 colorterm = os.environ.get("COLORTERM", "").lower()
844 if detect_ci():
845 return detect_ci_color_support()
846 elif os.name == "nt":
847 return ColorSupport.NONE
848 elif colorterm in ("truecolor", "24bit") or term == "xterm-kitty":
849 return ColorSupport.ANSI_TRUE
850 elif colorterm in ("yes", "true") or "256color" in term or term == "screen":
851 if os.name == "posix" and term == "xterm-256color" and shutil.which("wslinfo"):
852 return ColorSupport.ANSI_TRUE
853 else:
854 return ColorSupport.ANSI_256
855 elif "linux" in term or "color" in term or "ansi" in term or "xterm" in term:
856 return ColorSupport.ANSI
858 return ColorSupport.NONE
861def _is_tty(stream: _t.TextIO | None) -> bool:
862 try:
863 return stream is not None and stream.isatty()
864 except Exception: # pragma: no cover
865 return False
868def _input_is_tty(stream: _t.TextIO | None) -> bool:
869 try:
870 return stream is not None and _is_tty(stream) and stream.readable()
871 except Exception: # pragma: no cover
872 return False
875def _output_is_tty(stream: _t.TextIO | None) -> bool:
876 try:
877 return stream is not None and _is_tty(stream) and stream.writable()
878 except Exception: # pragma: no cover
879 return False
882@contextlib.contextmanager
883def _modify_keyboard(
884 ostream: _t.TextIO,
885 bracketed_paste: bool = False,
886 modify_keyboard: bool = False,
887):
888 prologue = []
889 if bracketed_paste:
890 prologue.append("\x1b[?2004h")
891 if modify_keyboard:
892 prologue.append("\x1b[>1u")
893 if prologue:
894 ostream.write("".join(prologue))
895 ostream.flush()
896 try:
897 yield
898 finally:
899 epilog = []
900 if bracketed_paste:
901 epilog.append("\x1b[?2004l")
902 epilog.append("\x1b[<u")
903 if epilog:
904 ostream.write("".join(epilog))
905 ostream.flush()
908# Platform-specific code for working with terminals.
909if os.name == "posix":
910 import select
911 import signal
912 import termios
913 import tty
915 def _is_foreground(stream: _t.TextIO | None) -> bool:
916 try:
917 return stream is not None and os.getpgrp() == os.tcgetpgrp(stream.fileno())
918 except Exception: # pragma: no cover
919 return False
921 @contextlib.contextmanager
922 def _enter_raw_mode(
923 ostream: _t.TextIO,
924 istream: _t.TextIO,
925 bracketed_paste: bool = False,
926 modify_keyboard: bool = False,
927 ):
928 prev_mode = termios.tcgetattr(istream)
929 new_mode = prev_mode.copy()
930 new_mode[tty.LFLAG] &= ~(
931 termios.ECHO # Don't print back what user types.
932 | termios.ICANON # Disable line editing.
933 | termios.ISIG # Disable signals on C-c and C-z.
934 )
935 new_mode[tty.CC] = new_mode[tty.CC].copy()
936 new_mode[tty.CC][termios.VMIN] = 1
937 new_mode[tty.CC][termios.VTIME] = 0
938 termios.tcsetattr(istream, termios.TCSAFLUSH, new_mode)
940 try:
941 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
942 yield
943 finally:
944 termios.tcsetattr(istream, termios.TCSAFLUSH, prev_mode)
946 def _read_keycode(
947 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
948 ) -> str:
949 if timeout and not bool(select.select([istream], [], [], timeout)[0]):
950 raise TimeoutError()
951 key = os.read(istream.fileno(), 128)
952 while bool(select.select([istream], [], [], 0)[0]):
953 key += os.read(istream.fileno(), 128)
955 return key.decode(istream.encoding, errors="replace")
957 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
958 pass
960 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
961 return False # This is a windows functionality
963 def _pause():
964 os.kill(os.getpid(), signal.SIGTSTP)
966elif os.name == "nt":
967 import ctypes
968 import ctypes.wintypes
969 import msvcrt
971 _FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer
972 _FlushConsoleInputBuffer.argtypes = [ctypes.wintypes.HANDLE]
973 _FlushConsoleInputBuffer.restype = ctypes.wintypes.BOOL
975 _GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
976 _GetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD]
977 _GetConsoleMode.restype = ctypes.wintypes.BOOL
979 _SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
980 _SetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD]
981 _SetConsoleMode.restype = ctypes.wintypes.BOOL
983 _GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow
984 _GetConsoleWindow.argtypes = []
985 _GetConsoleWindow.restype = ctypes.wintypes.HWND
987 _IsWindowVisible = ctypes.windll.user32.IsWindowVisible
988 _IsWindowVisible.argtypes = [ctypes.wintypes.HWND]
989 _SetConsoleMode.restype = ctypes.wintypes.BOOL
991 _ReadConsoleW = ctypes.windll.kernel32.ReadConsoleW
992 _ReadConsoleW.argtypes = [
993 ctypes.wintypes.HANDLE,
994 ctypes.wintypes.LPVOID,
995 ctypes.wintypes.DWORD,
996 ctypes.wintypes.LPDWORD,
997 ctypes.wintypes.LPVOID,
998 ]
999 _ReadConsoleW.restype = ctypes.wintypes.BOOL
1001 _ENABLE_PROCESSED_OUTPUT = 0x0001
1002 _ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002
1003 _ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
1004 _ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
1006 _ISTREAM_HANDLE = None
1008 def _is_foreground(stream: _t.TextIO | None) -> bool:
1009 window = _GetConsoleWindow()
1010 if not window:
1011 return False
1012 return _IsWindowVisible(window)
1014 @contextlib.contextmanager
1015 def _enter_raw_mode(
1016 ostream: _t.TextIO,
1017 istream: _t.TextIO,
1018 bracketed_paste: bool = False,
1019 modify_keyboard: bool = False,
1020 ):
1021 global _ISTREAM_HANDLE
1023 if _ISTREAM_HANDLE is None:
1024 _prepare_tty()
1025 _ISTREAM_HANDLE = msvcrt.get_osfhandle((_TTY_INPUT or __stdin).fileno()) # type: ignore
1027 mode = ctypes.wintypes.DWORD()
1028 success = _GetConsoleMode(_ISTREAM_HANDLE, ctypes.byref(mode))
1029 if not success:
1030 raise ctypes.WinError()
1031 success = _SetConsoleMode(_ISTREAM_HANDLE, _ENABLE_VIRTUAL_TERMINAL_INPUT)
1032 if not success:
1033 raise ctypes.WinError()
1035 try:
1036 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard):
1037 yield
1038 finally:
1039 success = _SetConsoleMode(_ISTREAM_HANDLE, mode)
1040 if not success:
1041 raise ctypes.WinError()
1043 def _read_keycode(
1044 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1045 ) -> str:
1046 assert _ISTREAM_HANDLE is not None
1048 CHAR16 = ctypes.wintypes.WCHAR * 16
1050 n_read = ctypes.wintypes.DWORD()
1051 buffer = CHAR16()
1053 success = _ReadConsoleW(
1054 _ISTREAM_HANDLE,
1055 ctypes.byref(buffer),
1056 16,
1057 ctypes.byref(n_read),
1058 0,
1059 )
1060 if not success:
1061 raise ctypes.WinError()
1063 return buffer.value
1065 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1066 assert _ISTREAM_HANDLE is not None
1068 success = _FlushConsoleInputBuffer(_ISTREAM_HANDLE)
1069 if not success:
1070 raise ctypes.WinError()
1072 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1073 try:
1074 version = sys.getwindowsversion()
1075 if version.major < 10 or version.build < 14931:
1076 return False
1078 handle = msvcrt.get_osfhandle(ostream.fileno())
1079 return bool(
1080 _SetConsoleMode(
1081 handle,
1082 _ENABLE_PROCESSED_OUTPUT
1083 | _ENABLE_WRAP_AT_EOL_OUTPUT
1084 | _ENABLE_VIRTUAL_TERMINAL_PROCESSING,
1085 )
1086 )
1087 except Exception: # pragma: no cover
1088 return False
1090 def _pause():
1091 pass
1093else: # pragma: no cover
1095 def _is_foreground(stream: _t.TextIO | None) -> bool:
1096 return False
1098 @contextlib.contextmanager
1099 def _enter_raw_mode(
1100 ostream: _t.TextIO,
1101 istream: _t.TextIO,
1102 bracketed_paste: bool = False,
1103 modify_keyboard: bool = False,
1104 ):
1105 raise OSError("not supported")
1106 yield
1108 def _read_keycode(
1109 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0
1110 ) -> str:
1111 raise OSError("not supported")
1113 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO):
1114 raise OSError("not supported")
1116 def _enable_vt_processing(ostream: _t.TextIO) -> bool:
1117 raise OSError("not supported")
1119 def _pause():
1120 raise OSError("not supported")