Coverage for yuio / term.py: 70%

458 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 15:42 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9Querying terminal info and working with ANSI terminals. 

10 

11This is a low-level module upon which :mod:`yuio.io` builds 

12its higher-level abstraction. 

13 

14 

15Detecting terminal capabilities 

16------------------------------- 

17 

18Terminal capabilities are stored in a :class:`Term` object. 

19 

20Usually, you don't need to query terminal capabilities yourself, 

21as you can use Yuio's global configuration from :mod:`yuio.io` 

22(see :func:`yuio.io.get_term`). 

23 

24However, you can get a :class:`Term` object by using :func:`get_term_from_stream`: 

25 

26.. autofunction:: get_term_from_stream 

27 

28.. autofunction:: get_tty 

29 

30.. autoclass:: Term 

31 :members: 

32 

33.. autoclass:: TerminalTheme 

34 :members: 

35 

36.. autoclass:: Lightness 

37 :members: 

38 

39 

40Utilities 

41--------- 

42 

43.. autofunction:: stream_is_unicode 

44 

45.. autofunction:: get_tty_size 

46 

47.. autofunction:: detect_ci 

48 

49.. autofunction:: detect_ci_color_support 

50 

51 

52Re-imports 

53---------- 

54 

55.. type:: ColorSupport 

56 :no-index: 

57 

58 Alias of :obj:`yuio.color.ColorSupport`. 

59 

60""" 

61 

62from __future__ import annotations 

63 

64import atexit 

65import contextlib 

66import dataclasses 

67import enum 

68import io 

69import locale 

70import os 

71import re 

72import shutil 

73import sys 

74import threading 

75from dataclasses import dataclass 

76 

77import yuio 

78import yuio.color 

79from yuio.color import ColorSupport 

80from yuio.util import ClosedIO as _ClosedIO 

81 

82from typing import TYPE_CHECKING 

83 

84if TYPE_CHECKING: 

85 import typing_extensions as _t 

86else: 

87 from yuio import _typing as _t 

88 

89__all__ = [ 

90 "ColorSupport", 

91 "Lightness", 

92 "Term", 

93 "TerminalTheme", 

94 "detect_ci", 

95 "detect_ci_color_support", 

96 "get_term_from_stream", 

97 "get_tty", 

98 "get_tty_size", 

99 "stream_is_unicode", 

100] 

101 

102T = _t.TypeVar("T") 

103 

104 

105_LOCK = threading.Lock() 

106 

107# These variables are set in `_prepare_tty`. 

108_TTY_SETUP_PERFORMED: bool = False 

109_TTY_OUTPUT: _t.TextIO | None 

110_TTY_INPUT: _t.TextIO | None 

111_TERMINAL_THEME: TerminalTheme | None 

112_EXPLICIT_COLOR_SUPPORT: ColorSupport | bool | None 

113_COLOR_SUPPORT: ColorSupport 

114 

115 

116# Redefine canonical streams so that we don't monkeypatch `sys.__std*__` in tests. 

117__stdin = sys.__stdin__ 

118__stdout = sys.__stdout__ 

119__stderr = sys.__stderr__ 

120 

121 

122class Lightness(enum.Enum): 

123 """ 

124 Overall color theme of a terminal. 

125 

126 Can help with deciding which colors to use when printing output. 

127 

128 """ 

129 

130 UNKNOWN = enum.auto() 

131 """ 

132 We couldn't determine terminal background, or it wasn't dark 

133 or bright enough to fall in one category or another. 

134 

135 """ 

136 

137 DARK = enum.auto() 

138 """ 

139 Terminal background is dark. 

140 

141 """ 

142 

143 LIGHT = enum.auto() 

144 """ 

145 Terminal background is light. 

146 

147 """ 

148 

149 

150@dataclass(frozen=True, slots=True) 

151class TerminalTheme: 

152 """ 

153 Colors and theme of the attached terminal. 

154 

155 """ 

156 

157 background: yuio.color.ColorValue 

158 """ 

159 Background color of a terminal. 

160 

161 """ 

162 

163 foreground: yuio.color.ColorValue 

164 """ 

165 Foreground color of a terminal. 

166 

167 """ 

168 

169 black: yuio.color.ColorValue 

170 """ 

171 Color value for the default "black" color. 

172 

173 """ 

174 

175 bright_black: yuio.color.ColorValue 

176 """ 

177 Color value for the default "bright black" color. 

178 

179 """ 

180 

181 red: yuio.color.ColorValue 

182 """ 

183 Color value for the default "red" color. 

184 

185 """ 

186 

187 bright_red: yuio.color.ColorValue 

188 """ 

189 Color value for the default "bright red" color. 

190 

191 """ 

192 

193 green: yuio.color.ColorValue 

194 """ 

195 Color value for the default "green" color. 

196 

197 """ 

198 

199 bright_green: yuio.color.ColorValue 

200 """ 

201 Color value for the default "bright green" color. 

202 

203 """ 

204 

205 yellow: yuio.color.ColorValue 

206 """ 

207 Color value for the default "yellow" color. 

208 

209 """ 

210 

211 bright_yellow: yuio.color.ColorValue 

212 """ 

213 Color value for the default "bright yellow" color. 

214 

215 """ 

216 

217 blue: yuio.color.ColorValue 

218 """ 

219 Color value for the default "blue" color. 

220 

221 """ 

222 

223 bright_blue: yuio.color.ColorValue 

224 """ 

225 Color value for the default "bright blue" color. 

226 

227 """ 

228 

229 magenta: yuio.color.ColorValue 

230 """ 

231 Color value for the default "magenta" color. 

232 

233 """ 

234 

235 bright_magenta: yuio.color.ColorValue 

236 """ 

237 Color value for the default "bright magenta" color. 

238 

239 """ 

240 

241 cyan: yuio.color.ColorValue 

242 """ 

243 Color value for the default "cyan" color. 

244 

245 """ 

246 

247 bright_cyan: yuio.color.ColorValue 

248 """ 

249 Color value for the default "bright cyan" color. 

250 

251 """ 

252 

253 white: yuio.color.ColorValue 

254 """ 

255 Color value for the default "white" color. 

256 

257 """ 

258 

259 bright_white: yuio.color.ColorValue 

260 """ 

261 Color value for the default "bright white" color. 

262 

263 """ 

264 

265 lightness: Lightness 

266 """ 

267 Overall color theme of a terminal, i.e. dark or light. 

268 

269 """ 

270 

271 

272@dataclass(frozen=True, slots=True) 

273class Term: 

274 """ 

275 This class contains all info about what kinds of things the terminal 

276 supports. If available, it will also have info about terminal's theme, 

277 i.e. dark or light background, etc. 

278 

279 """ 

280 

281 ostream: _t.TextIO 

282 """ 

283 Terminal's output stream. 

284 

285 """ 

286 

287 istream: _t.TextIO 

288 """ 

289 Terminal's input stream. 

290 

291 """ 

292 

293 color_support: ColorSupport = dataclasses.field( 

294 default=ColorSupport.NONE, kw_only=True 

295 ) 

296 """ 

297 Terminal's capability for coloring output. 

298 

299 """ 

300 

301 ostream_is_tty: bool = dataclasses.field(default=False, kw_only=True) 

302 """ 

303 Output is connecter to a terminal, and we're not in CI. 

304 

305 Note that output being connected to a TTY doesn't mean that it's interactive: 

306 this process can be in background. 

307 

308 """ 

309 

310 istream_is_tty: bool = dataclasses.field(default=False, kw_only=True) 

311 """ 

312 Output is connecter to a terminal, and we're not in CI. 

313 

314 Note that output being connected to a TTY doesn't mean that it's interactive: 

315 this process can be in background. 

316 

317 """ 

318 

319 terminal_theme: TerminalTheme | None = dataclasses.field(default=None, kw_only=True) 

320 """ 

321 Terminal's default foreground, background, and text colors. 

322 

323 """ 

324 

325 is_unicode: bool = dataclasses.field(default=False, kw_only=True) 

326 """ 

327 Terminal's output supports unicode characters. 

328 

329 """ 

330 

331 @property 

332 def supports_colors(self) -> bool: 

333 """ 

334 Return :data:`True` if terminal supports simple 8-bit color codes. 

335 

336 """ 

337 

338 return self.color_support >= ColorSupport.ANSI 

339 

340 @property 

341 def supports_colors_256(self) -> bool: 

342 """ 

343 Return :data:`True` if terminal supports 256-encoded colors. 

344 

345 """ 

346 

347 return self.color_support >= ColorSupport.ANSI_256 

348 

349 @property 

350 def supports_colors_true(self) -> bool: 

351 """ 

352 Return :data:`True` if terminal supports true colors. 

353 

354 """ 

355 

356 return self.color_support >= ColorSupport.ANSI_TRUE 

357 

358 @property 

359 def is_tty(self) -> bool: 

360 """ 

361 Return :data:`True` if input and output are connected to a TTY. In this mode 

362 we can interact with the user by writing and reading lines of text. 

363 

364 """ 

365 

366 return self.istream_is_tty and self.ostream_is_tty 

367 

368 @property 

369 def can_run_widgets(self) -> bool: 

370 """ 

371 Return :data:`True` if input and output are interactive and colors 

372 are supported. In this mode we can run interactive widgets. 

373 

374 """ 

375 

376 return self.color_support >= ColorSupport.ANSI and self.is_tty 

377 

378 @staticmethod 

379 def make_dummy(is_unicode: bool = True) -> Term: 

380 """ 

381 Make a dummy terminal with closed streams and no capabilities. 

382 

383 """ 

384 

385 stream = io.StringIO() 

386 stream.close() 

387 return Term( 

388 istream=_ClosedIO(), 

389 ostream=_ClosedIO(), 

390 is_unicode=is_unicode, 

391 ) 

392 

393 

394_CI_ENV_VARS = [ 

395 "TRAVIS", 

396 "CIRCLECI", 

397 "APPVEYOR", 

398 "GITLAB_CI", 

399 "BUILDKITE", 

400 "DRONE", 

401 "TEAMCITY_VERSION", 

402 "GITHUB_ACTIONS", 

403] 

404 

405 

406def stream_is_unicode(stream: _t.TextIO, /) -> bool: 

407 """ 

408 Determine of stream's encoding is some version of unicode. 

409 

410 """ 

411 

412 encoding = getattr(stream, "encoding", None) or locale.getpreferredencoding() or "" 

413 encoding = encoding.casefold() 

414 return "utf" in encoding or "unicode" in encoding 

415 

416 

417def get_tty_size(fallback: tuple[int, int] = (80, 24)): 

418 """ 

419 Like :func:`shutil.get_terminal_size`, but uses TTY stream if it's available. 

420 

421 :param fallback: 

422 tuple with width and height that will be used if query fails. 

423 

424 """ 

425 

426 _prepare_tty() 

427 

428 try: 

429 columns = int(os.environ["COLUMNS"]) 

430 except (KeyError, ValueError): 

431 columns = 0 

432 

433 try: 

434 lines = int(os.environ["LINES"]) 

435 except (KeyError, ValueError): 

436 lines = 0 

437 

438 if columns <= 0 or lines <= 0: 

439 try: 

440 size = os.get_terminal_size(_TTY_OUTPUT.fileno()) # type: ignore 

441 except (AttributeError, ValueError, OSError): 

442 # stream is closed, detached, or not a terminal, or 

443 # os.get_tty_size() is unsupported 

444 size = os.terminal_size(fallback) 

445 if columns <= 0: 

446 columns = size.columns or fallback[0] 

447 if lines <= 0: 

448 lines = size.lines or fallback[1] 

449 

450 return os.terminal_size((columns, lines)) 

451 

452 

453def detect_ci() -> bool: 

454 """ 

455 Scan environment variables to detect if we're in a known CI environment. 

456 

457 """ 

458 

459 return "CI" in os.environ or any(ci in os.environ for ci in _CI_ENV_VARS) 

460 

461 

462def detect_ci_color_support() -> ColorSupport: 

463 """ 

464 Scan environment variables to detect an appropriate level of color support 

465 of a CI environment. 

466 

467 If we're not in CI, return :attr:`ColorSupport.NONE <yuio.color.ColorSupport.NONE>`. 

468 

469 """ 

470 

471 if "GITHUB_ACTIONS" in os.environ: 

472 return ColorSupport.ANSI_TRUE 

473 elif any(ci in os.environ for ci in _CI_ENV_VARS): 

474 return ColorSupport.ANSI 

475 else: 

476 return ColorSupport.NONE 

477 

478 

479def get_tty() -> Term: 

480 """ 

481 Query info about TTY. 

482 

483 On Unix, this returns terminal connected to ``/dev/tty``. On Windows, this returns 

484 terminal connected to ``CONIN$``/``CONOUT$``. 

485 

486 If opening any of these fails, returns :class:`Term` with ``stdin``/``stdout`` 

487 as a fallback. 

488 

489 .. note:: 

490 

491 Prefer using ``stderr`` for normal IO: your users expect to be able to redirect 

492 messages from your program. 

493 

494 Only use ``/dev/tty`` for querying passwords or other things that must not 

495 be redirected. 

496 

497 """ 

498 

499 _prepare_tty() 

500 ostream = _TTY_OUTPUT or __stderr 

501 istream = _TTY_INPUT or __stdin 

502 assert ostream is not None 

503 assert istream is not None 

504 return get_term_from_stream(ostream, istream, allow_env_overrides=True) 

505 

506 

507def get_term_from_stream( 

508 ostream: _t.TextIO, 

509 istream: _t.TextIO | None = None, 

510 /, 

511 *, 

512 allow_env_overrides: bool = False, 

513) -> Term: 

514 """ 

515 Query info about a terminal attached to the given stream. 

516 

517 :param ostream: 

518 output stream. 

519 :param istream: 

520 input stream. If not given, widgets will not work with this terminal. 

521 :param allow_env_overrides: 

522 honor environment variables and CLI flags when determining capabilities 

523 of streams. 

524 

525 """ 

526 

527 is_unicode = stream_is_unicode(ostream) 

528 

529 if ( 

530 # For building docs in github. 

531 "YUIO_FORCE_FULL_TERM_SUPPORT" in os.environ and istream is not None 

532 ): # pragma: no cover 

533 return Term( 

534 ostream=ostream, 

535 istream=istream, 

536 color_support=ColorSupport.ANSI_TRUE, 

537 ostream_is_tty=True, 

538 istream_is_tty=True, 

539 is_unicode=is_unicode, 

540 ) 

541 

542 _prepare_tty() 

543 

544 output_is_tty = _output_is_tty(ostream) 

545 input_is_tty = _input_is_tty(istream) 

546 in_ci = detect_ci() 

547 

548 # Detect colors. 

549 if output_is_tty or (_EXPLICIT_COLOR_SUPPORT is not None and allow_env_overrides): 

550 color_support = _COLOR_SUPPORT 

551 else: 

552 color_support = ColorSupport.NONE 

553 

554 if istream is None: 

555 istream = _ClosedIO() 

556 

557 return Term( 

558 ostream=ostream, 

559 istream=istream, 

560 color_support=color_support, 

561 ostream_is_tty=output_is_tty and not in_ci, 

562 istream_is_tty=input_is_tty and not in_ci, 

563 terminal_theme=_TERMINAL_THEME, 

564 is_unicode=is_unicode, 

565 ) 

566 

567 

568def _prepare_tty(): 

569 if not _TTY_SETUP_PERFORMED: 

570 with _LOCK: 

571 if not _TTY_SETUP_PERFORMED: 

572 _do_prepare_tty() 

573 

574 

575def _do_prepare_tty(): 

576 global \ 

577 _TTY_SETUP_PERFORMED, \ 

578 _TERMINAL_THEME, \ 

579 _EXPLICIT_COLOR_SUPPORT, \ 

580 _COLOR_SUPPORT 

581 

582 _find_tty() 

583 

584 _TTY_SETUP_PERFORMED = True 

585 

586 # Theme is `None` for now, will query it later. 

587 _TERMINAL_THEME = None 

588 

589 # Find out if user specified `--color` or `FORCE_COLOR`. 

590 _EXPLICIT_COLOR_SUPPORT = _detect_explicit_color_settings() 

591 

592 # Check user preferences. 

593 if _EXPLICIT_COLOR_SUPPORT is False: 

594 # Colors disabled, nothing more to do. 

595 _COLOR_SUPPORT = ColorSupport.NONE 

596 return 

597 elif _EXPLICIT_COLOR_SUPPORT is True: 

598 # At least ANSI. Might improve later. 

599 _COLOR_SUPPORT = max(ColorSupport.ANSI, _detect_color_support_from_env()) 

600 elif _EXPLICIT_COLOR_SUPPORT is None: 

601 # At least NONE. Might improve later. 

602 _COLOR_SUPPORT = _detect_color_support_from_env() 

603 else: 

604 # Exact color support is given. 

605 _COLOR_SUPPORT = _EXPLICIT_COLOR_SUPPORT 

606 

607 if _TTY_OUTPUT is None: 

608 # Can't find attached TTY output, hence can't improve color support. 

609 return 

610 

611 if os.name == "nt": 

612 # Try enabling true colors. 

613 if _enable_vt_processing(_TTY_OUTPUT): 

614 # Success, can improve color support. 

615 if _EXPLICIT_COLOR_SUPPORT is None or _EXPLICIT_COLOR_SUPPORT is True: 

616 _COLOR_SUPPORT = ColorSupport.ANSI_TRUE 

617 else: 

618 # Failure, this version of Windows does not support colors. 

619 return 

620 

621 if _TTY_INPUT is None: 

622 # Can't find attached TTY input, hence can't improve color support. 

623 return 

624 

625 if not _is_foreground(_TTY_OUTPUT) or not _is_foreground(_TTY_INPUT): 

626 # We're not a foreground process, we won't be able to fetch colors. 

627 return 

628 if detect_ci(): 

629 # We're in CI, we won't be able to fetch colors. 

630 return 

631 if not _is_tty(__stdin): 

632 # We don't want to query colors if our stdin is redirected: this is a sign 

633 # that this program runs in some sort of a pipeline, and multiple instances 

634 # of it might run at the same time. If this happens, several processes/threads 

635 # can interact with the same TTY, leading to garbled output. 

636 return 

637 

638 if _COLOR_SUPPORT >= ColorSupport.ANSI: 

639 # We were able to find TTY, and colors are supported. 

640 # Try fetching terminal theme. 

641 _TERMINAL_THEME = _get_standard_colors(_TTY_OUTPUT, _TTY_INPUT) 

642 

643 

644def _find_tty(): 

645 global _TTY_OUTPUT, _TTY_INPUT 

646 

647 _TTY_OUTPUT = _TTY_INPUT = None 

648 

649 closer = contextlib.ExitStack() 

650 try: 

651 if os.name == "nt": 

652 file_io_in = io._WindowsConsoleIO("CONIN$", "r") # type: ignore 

653 tty_in = closer.enter_context( 

654 io.TextIOWrapper(file_io_in, encoding="utf-8") 

655 ) 

656 file_io_out = io._WindowsConsoleIO("CONOUT$", "w") # type: ignore 

657 tty_out = closer.enter_context( 

658 io.TextIOWrapper(file_io_out, encoding="utf-8") 

659 ) 

660 else: 

661 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) 

662 file_io = io.FileIO(fd, "w+") 

663 file_io.name = "/dev/tty" 

664 tty_in = tty_out = closer.enter_context(io.TextIOWrapper(file_io)) 

665 except (OSError, AttributeError): 

666 closer.close() 

667 except: 

668 closer.close() 

669 raise 

670 else: 

671 atexit.register(closer.close) 

672 _TTY_INPUT = tty_in 

673 _TTY_OUTPUT = tty_out 

674 return 

675 

676 for stream in (__stderr, __stdout): 

677 if stream is not None and _output_is_tty(stream): 

678 _TTY_OUTPUT = stream 

679 break 

680 if __stdin is not None and _input_is_tty(__stdin): 

681 _TTY_INPUT = __stdin 

682 

683 

684def _get_standard_colors( 

685 ostream: _t.TextIO, istream: _t.TextIO 

686) -> TerminalTheme | None: 

687 if "YUIO_DISABLE_OSC_QUERIES" in os.environ: 

688 return None 

689 

690 try: 

691 query = "\x1b]10;?\x1b\\\x1b]11;?\x1b\\" + "".join( 

692 [f"\x1b]4;{i};?\x1b\\" for i in range(16)] 

693 ) 

694 response = _query_term(ostream, istream, query) 

695 if not response: 

696 return None 

697 

698 # Deal with foreground color. 

699 

700 match = re.match( 

701 r"^\x1b]10;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

702 response, 

703 re.IGNORECASE, 

704 ) 

705 if match is None: # pragma: no cover 

706 return None 

707 

708 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()) 

709 foreground = yuio.color.ColorValue.from_rgb(r, g, b) 

710 

711 response = response[match.end() :] 

712 

713 # Deal with background color. 

714 

715 match = re.match( 

716 r"^\x1b]11;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

717 response, 

718 re.IGNORECASE, 

719 ) 

720 if match is None: # pragma: no cover 

721 return None 

722 

723 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()) 

724 background = yuio.color.ColorValue.from_rgb(r, g, b) 

725 luma = (0.2627 * r + 0.6780 * g + 0.0593 * b) / 256 

726 

727 if luma <= 0.2: 

728 lightness = Lightness.DARK 

729 elif luma >= 0.85: 

730 lightness = Lightness.LIGHT 

731 else: 

732 lightness = Lightness.UNKNOWN 

733 

734 response = response[match.end() :] 

735 

736 # Deal with other colors 

737 

738 colors = {} 

739 

740 while response: 

741 match = re.match( 

742 r"^\x1b]4;(\d+);rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

743 response, 

744 re.IGNORECASE, 

745 ) 

746 if match is None: # pragma: no cover 

747 return None 

748 

749 c = int(match.group(1)) 

750 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()[1:]) 

751 colors[c] = yuio.color.ColorValue.from_rgb(r, g, b) 

752 

753 response = response[match.end() :] 

754 

755 for k in range(8): 

756 if k not in colors: 

757 return None 

758 

759 # return colors 

760 return TerminalTheme( 

761 background=background, 

762 foreground=foreground, 

763 black=colors[0], 

764 bright_black=colors.get(8 + 0, colors[0]), 

765 red=colors[1], 

766 bright_red=colors.get(8 + 1, colors[1]), 

767 green=colors[2], 

768 bright_green=colors.get(8 + 2, colors[2]), 

769 yellow=colors[3], 

770 bright_yellow=colors.get(8 + 3, colors[3]), 

771 blue=colors[4], 

772 bright_blue=colors.get(8 + 4, colors[4]), 

773 magenta=colors[5], 

774 bright_magenta=colors.get(8 + 5, colors[5]), 

775 cyan=colors[6], 

776 bright_cyan=colors.get(8 + 6, colors[6]), 

777 white=colors[7], 

778 bright_white=colors.get(8 + 7, colors[7]), 

779 lightness=lightness, 

780 ) 

781 

782 except Exception: # pragma: no cover 

783 return None 

784 

785 

786def _query_term(ostream: _t.TextIO, istream: _t.TextIO, query: str) -> str | None: 

787 try: 

788 # Lock the keyboard. 

789 ostream.write("\x1b[2h") 

790 ostream.flush() 

791 

792 with _enter_raw_mode(ostream, istream): 

793 _flush_input_buffer(ostream, istream) 

794 

795 # It is important that we unlock keyboard before exiting `cbreak`, 

796 # hence the nested `try`. 

797 try: 

798 # Append a DA1 query, as virtually all terminals support it. 

799 ostream.write(query + "\x1b[c") 

800 ostream.flush() 

801 

802 buf = _read_keycode(ostream, istream, timeout=0.250) 

803 if not buf.startswith("\x1b"): 

804 yuio._logger.warning("_query_term invalid response: %r", buf) 

805 return None 

806 

807 # Read till we find a DA1 response. 

808 while not re.search(r"\x1b\[\?.*?c", buf): 

809 buf += _read_keycode(ostream, istream, timeout=0.250) 

810 

811 return buf[: buf.index("\x1b[?")] 

812 finally: 

813 _flush_input_buffer(ostream, istream) 

814 

815 # Release the keyboard. 

816 ostream.write("\x1b[2i") 

817 ostream.flush() 

818 except Exception: # pragma: no cover 

819 yuio._logger.warning("_query_term error", exc_info=True) 

820 return None 

821 finally: 

822 # Release the keyboard. 

823 ostream.write("\x1b[2i") 

824 ostream.flush() 

825 

826 

827def _detect_explicit_color_settings() -> ColorSupport | bool | None: 

828 color_support = None 

829 

830 if "FORCE_COLOR" in os.environ: 

831 color_support = True 

832 

833 if "NO_COLOR" in os.environ or "FORCE_NO_COLOR" in os.environ: 

834 color_support = False 

835 

836 # Note: we don't rely on argparse to parse flags and send them to us 

837 # because these functions can be called before parsing arguments. 

838 for arg in sys.argv[1:]: 

839 if arg in ("--color", "--force-color"): 

840 color_support = True 

841 elif arg in ("--no-color", "--force-no-color"): 

842 color_support = False 

843 elif arg.startswith(("--color=", "--colors=")): 

844 value = ( 

845 arg.split("=", maxsplit=1)[1] 

846 .replace("_", "") 

847 .replace("-", "") 

848 .casefold() 

849 ) 

850 if value in ["1", "yes", "true"]: 

851 color_support = True 

852 elif value in ["0", "no", "false"]: 

853 color_support = False 

854 elif value == "ansi": 

855 color_support = ColorSupport.ANSI 

856 elif value == "ansi256": 

857 color_support = ColorSupport.ANSI_256 

858 elif value == "ansitrue": 

859 color_support = ColorSupport.ANSI_TRUE 

860 

861 return color_support 

862 

863 

864def _detect_color_support_from_env() -> ColorSupport: 

865 term = os.environ.get("TERM", "").lower() 

866 colorterm = os.environ.get("COLORTERM", "").lower() 

867 

868 if detect_ci(): 

869 return detect_ci_color_support() 

870 elif os.name == "nt": 

871 return ColorSupport.NONE 

872 elif colorterm in ("truecolor", "24bit") or term == "xterm-kitty": 

873 return ColorSupport.ANSI_TRUE 

874 elif colorterm in ("yes", "true") or "256color" in term or term == "screen": 

875 if os.name == "posix" and term == "xterm-256color" and shutil.which("wslinfo"): 

876 return ColorSupport.ANSI_TRUE 

877 else: 

878 return ColorSupport.ANSI_256 

879 elif "linux" in term or "color" in term or "ansi" in term or "xterm" in term: 

880 return ColorSupport.ANSI 

881 

882 return ColorSupport.NONE 

883 

884 

885def _is_tty(stream: _t.TextIO | None) -> bool: 

886 try: 

887 return stream is not None and stream.isatty() 

888 except Exception: # pragma: no cover 

889 return False 

890 

891 

892def _input_is_tty(stream: _t.TextIO | None) -> bool: 

893 try: 

894 return stream is not None and _is_tty(stream) and stream.readable() 

895 except Exception: # pragma: no cover 

896 return False 

897 

898 

899def _output_is_tty(stream: _t.TextIO | None) -> bool: 

900 try: 

901 return stream is not None and _is_tty(stream) and stream.writable() 

902 except Exception: # pragma: no cover 

903 return False 

904 

905 

906@contextlib.contextmanager 

907def _modify_keyboard( 

908 ostream: _t.TextIO, 

909 bracketed_paste: bool = False, 

910 modify_keyboard: bool = False, 

911): 

912 prologue = [] 

913 if bracketed_paste: 

914 prologue.append("\x1b[?2004h") 

915 if modify_keyboard: 

916 prologue.append("\x1b[>1u") 

917 if prologue: 

918 ostream.write("".join(prologue)) 

919 ostream.flush() 

920 try: 

921 yield 

922 finally: 

923 epilog = [] 

924 if bracketed_paste: 

925 epilog.append("\x1b[?2004l") 

926 epilog.append("\x1b[<u") 

927 if epilog: 

928 ostream.write("".join(epilog)) 

929 ostream.flush() 

930 

931 

932# Platform-specific code for working with terminals. 

933if os.name == "posix": 

934 import select 

935 import signal 

936 import termios 

937 import tty 

938 

939 def _is_foreground(stream: _t.TextIO | None) -> bool: 

940 try: 

941 return stream is not None and os.getpgrp() == os.tcgetpgrp(stream.fileno()) 

942 except Exception: # pragma: no cover 

943 return False 

944 

945 @contextlib.contextmanager 

946 def _enter_raw_mode( 

947 ostream: _t.TextIO, 

948 istream: _t.TextIO, 

949 bracketed_paste: bool = False, 

950 modify_keyboard: bool = False, 

951 ): 

952 prev_mode = termios.tcgetattr(istream) 

953 new_mode = prev_mode.copy() 

954 new_mode[tty.LFLAG] &= ~( 

955 termios.ECHO # Don't print back what user types. 

956 | termios.ICANON # Disable line editing. 

957 | termios.ISIG # Disable signals on C-c and C-z. 

958 ) 

959 new_mode[tty.CC] = new_mode[tty.CC].copy() 

960 new_mode[tty.CC][termios.VMIN] = 1 

961 new_mode[tty.CC][termios.VTIME] = 0 

962 termios.tcsetattr(istream, termios.TCSAFLUSH, new_mode) 

963 

964 try: 

965 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard): 

966 yield 

967 finally: 

968 termios.tcsetattr(istream, termios.TCSAFLUSH, prev_mode) 

969 

970 def _read_keycode( 

971 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

972 ) -> str: 

973 if timeout and not bool(select.select([istream], [], [], timeout)[0]): 

974 raise TimeoutError() 

975 key = os.read(istream.fileno(), 128) 

976 while bool(select.select([istream], [], [], 0)[0]): 

977 key += os.read(istream.fileno(), 128) 

978 

979 return key.decode(istream.encoding, errors="replace") 

980 

981 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

982 pass 

983 

984 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

985 return False # This is a windows functionality 

986 

987 def _pause(): 

988 os.kill(os.getpid(), signal.SIGTSTP) 

989 

990elif os.name == "nt": 

991 import ctypes 

992 import ctypes.wintypes 

993 import msvcrt 

994 

995 _FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer 

996 _FlushConsoleInputBuffer.argtypes = [ctypes.wintypes.HANDLE] 

997 _FlushConsoleInputBuffer.restype = ctypes.wintypes.BOOL 

998 

999 _GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode 

1000 _GetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD] 

1001 _GetConsoleMode.restype = ctypes.wintypes.BOOL 

1002 

1003 _SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode 

1004 _SetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD] 

1005 _SetConsoleMode.restype = ctypes.wintypes.BOOL 

1006 

1007 _GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow 

1008 _GetConsoleWindow.argtypes = [] 

1009 _GetConsoleWindow.restype = ctypes.wintypes.HWND 

1010 

1011 _IsWindowVisible = ctypes.windll.user32.IsWindowVisible 

1012 _IsWindowVisible.argtypes = [ctypes.wintypes.HWND] 

1013 _SetConsoleMode.restype = ctypes.wintypes.BOOL 

1014 

1015 _ReadConsoleW = ctypes.windll.kernel32.ReadConsoleW 

1016 _ReadConsoleW.argtypes = [ 

1017 ctypes.wintypes.HANDLE, 

1018 ctypes.wintypes.LPVOID, 

1019 ctypes.wintypes.DWORD, 

1020 ctypes.wintypes.LPDWORD, 

1021 ctypes.wintypes.LPVOID, 

1022 ] 

1023 _ReadConsoleW.restype = ctypes.wintypes.BOOL 

1024 

1025 _ENABLE_PROCESSED_OUTPUT = 0x0001 

1026 _ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002 

1027 _ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 

1028 _ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200 

1029 

1030 _ISTREAM_HANDLE = None 

1031 

1032 def _is_foreground(stream: _t.TextIO | None) -> bool: 

1033 window = _GetConsoleWindow() 

1034 if not window: 

1035 return False 

1036 return _IsWindowVisible(window) 

1037 

1038 @contextlib.contextmanager 

1039 def _enter_raw_mode( 

1040 ostream: _t.TextIO, 

1041 istream: _t.TextIO, 

1042 bracketed_paste: bool = False, 

1043 modify_keyboard: bool = False, 

1044 ): 

1045 global _ISTREAM_HANDLE 

1046 

1047 if _ISTREAM_HANDLE is None: 

1048 _prepare_tty() 

1049 _ISTREAM_HANDLE = msvcrt.get_osfhandle((_TTY_INPUT or __stdin).fileno()) # type: ignore 

1050 

1051 mode = ctypes.wintypes.DWORD() 

1052 success = _GetConsoleMode(_ISTREAM_HANDLE, ctypes.byref(mode)) 

1053 if not success: 

1054 raise ctypes.WinError() 

1055 success = _SetConsoleMode(_ISTREAM_HANDLE, _ENABLE_VIRTUAL_TERMINAL_INPUT) 

1056 if not success: 

1057 raise ctypes.WinError() 

1058 

1059 try: 

1060 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard): 

1061 yield 

1062 finally: 

1063 success = _SetConsoleMode(_ISTREAM_HANDLE, mode) 

1064 if not success: 

1065 raise ctypes.WinError() 

1066 

1067 def _read_keycode( 

1068 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

1069 ) -> str: 

1070 assert _ISTREAM_HANDLE is not None 

1071 

1072 CHAR16 = ctypes.wintypes.WCHAR * 16 

1073 

1074 n_read = ctypes.wintypes.DWORD() 

1075 buffer = CHAR16() 

1076 

1077 success = _ReadConsoleW( 

1078 _ISTREAM_HANDLE, 

1079 ctypes.byref(buffer), 

1080 16, 

1081 ctypes.byref(n_read), 

1082 0, 

1083 ) 

1084 if not success: 

1085 raise ctypes.WinError() 

1086 

1087 return buffer.value 

1088 

1089 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

1090 assert _ISTREAM_HANDLE is not None 

1091 

1092 success = _FlushConsoleInputBuffer(_ISTREAM_HANDLE) 

1093 if not success: 

1094 raise ctypes.WinError() 

1095 

1096 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

1097 try: 

1098 version = sys.getwindowsversion() 

1099 if version.major < 10 or version.build < 14931: 

1100 return False 

1101 

1102 handle = msvcrt.get_osfhandle(ostream.fileno()) 

1103 return bool( 

1104 _SetConsoleMode( 

1105 handle, 

1106 _ENABLE_PROCESSED_OUTPUT 

1107 | _ENABLE_WRAP_AT_EOL_OUTPUT 

1108 | _ENABLE_VIRTUAL_TERMINAL_PROCESSING, 

1109 ) 

1110 ) 

1111 except Exception: # pragma: no cover 

1112 return False 

1113 

1114 def _pause(): 

1115 pass 

1116 

1117else: # pragma: no cover 

1118 

1119 def _is_foreground(stream: _t.TextIO | None) -> bool: 

1120 return False 

1121 

1122 @contextlib.contextmanager 

1123 def _enter_raw_mode( 

1124 ostream: _t.TextIO, 

1125 istream: _t.TextIO, 

1126 bracketed_paste: bool = False, 

1127 modify_keyboard: bool = False, 

1128 ): 

1129 raise OSError("not supported") 

1130 yield 

1131 

1132 def _read_keycode( 

1133 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

1134 ) -> str: 

1135 raise OSError("not supported") 

1136 

1137 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

1138 raise OSError("not supported") 

1139 

1140 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

1141 raise OSError("not supported") 

1142 

1143 def _pause(): 

1144 raise OSError("not supported")