Coverage for yuio / term.py: 69%

461 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-29 19:55 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9Querying terminal info and working with ANSI terminals. 

10 

11This is a low-level module upon which :mod:`yuio.io` builds 

12its higher-level abstraction. 

13 

14 

15Detecting terminal capabilities 

16------------------------------- 

17 

18Terminal capabilities are stored in a :class:`Term` object. 

19 

20Usually, you don't need to query terminal capabilities yourself, 

21as you can use Yuio's global configuration from :mod:`yuio.io` 

22(see :func:`yuio.io.get_term`). 

23 

24However, you can get a :class:`Term` object by using :func:`get_term_from_stream`: 

25 

26.. autofunction:: get_term_from_stream 

27 

28.. autofunction:: get_tty 

29 

30.. autoclass:: Term 

31 :members: 

32 

33.. autoclass:: TerminalTheme 

34 :members: 

35 

36.. autoclass:: Lightness 

37 :members: 

38 

39 

40Utilities 

41--------- 

42 

43.. autofunction:: stream_is_unicode 

44 

45.. autofunction:: get_tty_size 

46 

47.. autofunction:: detect_ci 

48 

49.. autofunction:: detect_ci_color_support 

50 

51 

52Re-imports 

53---------- 

54 

55.. type:: ColorSupport 

56 :no-index: 

57 

58 Alias of :obj:`yuio.color.ColorSupport`. 

59 

60""" 

61 

62from __future__ import annotations 

63 

64import atexit 

65import contextlib 

66import dataclasses 

67import enum 

68import io 

69import locale 

70import os 

71import re 

72import shutil 

73import sys 

74import threading 

75from dataclasses import dataclass 

76 

77import yuio 

78import yuio.color 

79from yuio.color import ColorSupport 

80from yuio.util import ClosedIO as _ClosedIO 

81 

82from typing import TYPE_CHECKING 

83 

84if TYPE_CHECKING: 

85 import typing_extensions as _t 

86else: 

87 from yuio import _typing as _t 

88 

89__all__ = [ 

90 "ColorSupport", 

91 "Lightness", 

92 "Term", 

93 "TerminalTheme", 

94 "detect_ci", 

95 "detect_ci_color_support", 

96 "get_term_from_stream", 

97 "get_tty", 

98 "get_tty_size", 

99 "stream_is_unicode", 

100] 

101 

102T = _t.TypeVar("T") 

103 

104 

105_LOCK = threading.Lock() 

106 

107# These variables are set in `_prepare_tty`. 

108_TTY_SETUP_PERFORMED: bool = False 

109_TTY_OUTPUT: _t.TextIO | None 

110_TTY_INPUT: _t.TextIO | None 

111_TERMINAL_THEME: TerminalTheme | None 

112_EXPLICIT_COLOR_SUPPORT: ColorSupport | bool | None 

113_COLOR_SUPPORT: ColorSupport 

114 

115 

116# Redefine canonical streams so that we don't monkeypatch `sys.__std*__` in tests. 

117__stdin = sys.__stdin__ 

118__stdout = sys.__stdout__ 

119__stderr = sys.__stderr__ 

120 

121 

122class Lightness(enum.Enum): 

123 """ 

124 Overall color theme of a terminal. 

125 

126 Can help with deciding which colors to use when printing output. 

127 

128 """ 

129 

130 UNKNOWN = enum.auto() 

131 """ 

132 We couldn't determine terminal background, or it wasn't dark 

133 or bright enough to fall in one category or another. 

134 

135 """ 

136 

137 DARK = enum.auto() 

138 """ 

139 Terminal background is dark. 

140 

141 """ 

142 

143 LIGHT = enum.auto() 

144 """ 

145 Terminal background is light. 

146 

147 """ 

148 

149 

150@dataclass(frozen=True, slots=True) 

151class TerminalTheme: 

152 """ 

153 Colors and theme of the attached terminal. 

154 

155 """ 

156 

157 background: yuio.color.ColorValue 

158 """ 

159 Background color of a terminal. 

160 

161 """ 

162 

163 foreground: yuio.color.ColorValue 

164 """ 

165 Foreground color of a terminal. 

166 

167 """ 

168 

169 black: yuio.color.ColorValue 

170 """ 

171 Color value for the default "black" color. 

172 

173 """ 

174 

175 bright_black: yuio.color.ColorValue 

176 """ 

177 Color value for the default "bright black" color. 

178 

179 """ 

180 

181 red: yuio.color.ColorValue 

182 """ 

183 Color value for the default "red" color. 

184 

185 """ 

186 

187 bright_red: yuio.color.ColorValue 

188 """ 

189 Color value for the default "bright red" color. 

190 

191 """ 

192 

193 green: yuio.color.ColorValue 

194 """ 

195 Color value for the default "green" color. 

196 

197 """ 

198 

199 bright_green: yuio.color.ColorValue 

200 """ 

201 Color value for the default "bright green" color. 

202 

203 """ 

204 

205 yellow: yuio.color.ColorValue 

206 """ 

207 Color value for the default "yellow" color. 

208 

209 """ 

210 

211 bright_yellow: yuio.color.ColorValue 

212 """ 

213 Color value for the default "bright yellow" color. 

214 

215 """ 

216 

217 blue: yuio.color.ColorValue 

218 """ 

219 Color value for the default "blue" color. 

220 

221 """ 

222 

223 bright_blue: yuio.color.ColorValue 

224 """ 

225 Color value for the default "bright blue" color. 

226 

227 """ 

228 

229 magenta: yuio.color.ColorValue 

230 """ 

231 Color value for the default "magenta" color. 

232 

233 """ 

234 

235 bright_magenta: yuio.color.ColorValue 

236 """ 

237 Color value for the default "bright magenta" color. 

238 

239 """ 

240 

241 cyan: yuio.color.ColorValue 

242 """ 

243 Color value for the default "cyan" color. 

244 

245 """ 

246 

247 bright_cyan: yuio.color.ColorValue 

248 """ 

249 Color value for the default "bright cyan" color. 

250 

251 """ 

252 

253 white: yuio.color.ColorValue 

254 """ 

255 Color value for the default "white" color. 

256 

257 """ 

258 

259 bright_white: yuio.color.ColorValue 

260 """ 

261 Color value for the default "bright white" color. 

262 

263 """ 

264 

265 lightness: Lightness 

266 """ 

267 Overall color theme of a terminal, i.e. dark or light. 

268 

269 """ 

270 

271 

272@dataclass(frozen=True, slots=True) 

273class Term: 

274 """ 

275 This class contains all info about what kinds of things the terminal 

276 supports. If available, it will also have info about terminal's theme, 

277 i.e. dark or light background, etc. 

278 

279 """ 

280 

281 ostream: _t.TextIO 

282 """ 

283 Terminal's output stream. 

284 

285 """ 

286 

287 istream: _t.TextIO 

288 """ 

289 Terminal's input stream. 

290 

291 """ 

292 

293 color_support: ColorSupport = dataclasses.field( 

294 default=ColorSupport.NONE, kw_only=True 

295 ) 

296 """ 

297 Terminal's capability for coloring output. 

298 

299 """ 

300 

301 ostream_is_tty: bool = dataclasses.field(default=False, kw_only=True) 

302 """ 

303 Output is connecter to a terminal, and we're not in CI. 

304 

305 Note that output being connected to a TTY doesn't mean that it's interactive: 

306 this process can be in background. 

307 

308 """ 

309 

310 istream_is_tty: bool = dataclasses.field(default=False, kw_only=True) 

311 """ 

312 Output is connecter to a terminal, and we're not in CI. 

313 

314 Note that output being connected to a TTY doesn't mean that it's interactive: 

315 this process can be in background. 

316 

317 """ 

318 

319 terminal_theme: TerminalTheme | None = dataclasses.field(default=None, kw_only=True) 

320 """ 

321 Terminal's default foreground, background, and text colors. 

322 

323 """ 

324 

325 is_unicode: bool = dataclasses.field(default=False, kw_only=True) 

326 """ 

327 Terminal's output supports unicode characters. 

328 

329 """ 

330 

331 @property 

332 def supports_colors(self) -> bool: 

333 """ 

334 Return :data:`True` if terminal supports simple 8-bit color codes. 

335 

336 """ 

337 

338 return self.color_support >= ColorSupport.ANSI 

339 

340 @property 

341 def supports_colors_256(self) -> bool: 

342 """ 

343 Return :data:`True` if terminal supports 256-encoded colors. 

344 

345 """ 

346 

347 return self.color_support >= ColorSupport.ANSI_256 

348 

349 @property 

350 def supports_colors_true(self) -> bool: 

351 """ 

352 Return :data:`True` if terminal supports true colors. 

353 

354 """ 

355 

356 return self.color_support >= ColorSupport.ANSI_TRUE 

357 

358 @property 

359 def is_tty(self) -> bool: 

360 """ 

361 Return :data:`True` if input and output are connected to a TTY. In this mode 

362 we can interact with the user by writing and reading lines of text. 

363 

364 """ 

365 

366 return self.istream_is_tty and self.ostream_is_tty 

367 

368 @property 

369 def can_run_widgets(self) -> bool: 

370 """ 

371 Return :data:`True` if input and output are interactive and colors 

372 are supported. In this mode we can run interactive widgets. 

373 

374 """ 

375 

376 return self.color_support >= ColorSupport.ANSI and self.is_tty 

377 

378 @staticmethod 

379 def make_dummy(is_unicode: bool = True) -> Term: 

380 """ 

381 Make a dummy terminal with closed streams and no capabilities. 

382 

383 """ 

384 

385 stream = io.StringIO() 

386 stream.close() 

387 return Term( 

388 istream=_ClosedIO(), 

389 ostream=_ClosedIO(), 

390 is_unicode=is_unicode, 

391 ) 

392 

393 

394_CI_ENV_VARS = [ 

395 "TRAVIS", 

396 "CIRCLECI", 

397 "APPVEYOR", 

398 "GITLAB_CI", 

399 "BUILDKITE", 

400 "DRONE", 

401 "TEAMCITY_VERSION", 

402 "GITHUB_ACTIONS", 

403] 

404 

405 

406def stream_is_unicode(stream: _t.TextIO, /) -> bool: 

407 """ 

408 Determine of stream's encoding is some version of unicode. 

409 

410 """ 

411 

412 encoding = getattr(stream, "encoding", None) or locale.getpreferredencoding() or "" 

413 encoding = encoding.casefold() 

414 return "utf" in encoding or "unicode" in encoding 

415 

416 

417def get_tty_size(fallback: tuple[int, int] = (80, 24)): 

418 """ 

419 Like :func:`shutil.get_terminal_size`, but uses TTY stream if it's available. 

420 

421 :param fallback: 

422 tuple with width and height that will be used if query fails. 

423 

424 """ 

425 

426 _prepare_tty() 

427 

428 try: 

429 columns = int(os.environ["COLUMNS"]) 

430 except (KeyError, ValueError): 

431 columns = 0 

432 

433 try: 

434 lines = int(os.environ["LINES"]) 

435 except (KeyError, ValueError): 

436 lines = 0 

437 

438 if columns <= 0 or lines <= 0: 

439 try: 

440 size = os.get_terminal_size(_TTY_OUTPUT.fileno()) # type: ignore 

441 except (AttributeError, ValueError, OSError): 

442 # stream is closed, detached, or not a terminal, or 

443 # os.get_tty_size() is unsupported 

444 size = os.terminal_size(fallback) 

445 if columns <= 0: 

446 columns = size.columns or fallback[0] 

447 if lines <= 0: 

448 lines = size.lines or fallback[1] 

449 

450 return os.terminal_size((columns, lines)) 

451 

452 

453_IN_CI = None 

454 

455 

456def detect_ci() -> bool: 

457 """ 

458 Scan environment variables to detect if we're in a known CI environment. 

459 

460 """ 

461 

462 global _IN_CI 

463 

464 if _IN_CI is None: 

465 _IN_CI = "CI" in os.environ or any(ci in os.environ for ci in _CI_ENV_VARS) 

466 

467 return _IN_CI 

468 

469 

470def detect_ci_color_support() -> ColorSupport: 

471 """ 

472 Scan environment variables to detect an appropriate level of color support 

473 of a CI environment. 

474 

475 If we're not in CI, return :attr:`ColorSupport.NONE <yuio.color.ColorSupport.NONE>`. 

476 

477 """ 

478 

479 if "GITHUB_ACTIONS" in os.environ: 

480 return ColorSupport.ANSI_TRUE 

481 elif any(ci in os.environ for ci in _CI_ENV_VARS): 

482 return ColorSupport.ANSI 

483 else: 

484 return ColorSupport.NONE 

485 

486 

487def get_tty() -> Term: 

488 """ 

489 Query info about TTY. 

490 

491 On Unix, this returns terminal connected to ``/dev/tty``. On Windows, this returns 

492 terminal connected to ``CONIN$``/``CONOUT$``. 

493 

494 If opening any of these fails, returns :class:`Term` with ``stdin``/``stdout`` 

495 as a fallback. 

496 

497 .. note:: 

498 

499 Prefer using ``stderr`` for normal IO: your users expect to be able to redirect 

500 messages from your program. 

501 

502 Only use ``/dev/tty`` for querying passwords or other things that must not 

503 be redirected. 

504 

505 """ 

506 

507 _prepare_tty() 

508 ostream = _TTY_OUTPUT or __stderr 

509 istream = _TTY_INPUT or __stdin 

510 assert ostream is not None 

511 assert istream is not None 

512 return get_term_from_stream(ostream, istream, allow_env_overrides=True) 

513 

514 

515def get_term_from_stream( 

516 ostream: _t.TextIO, 

517 istream: _t.TextIO | None = None, 

518 /, 

519 *, 

520 allow_env_overrides: bool = False, 

521) -> Term: 

522 """ 

523 Query info about a terminal attached to the given stream. 

524 

525 :param ostream: 

526 output stream. 

527 :param istream: 

528 input stream. If not given, widgets will not work with this terminal. 

529 :param allow_env_overrides: 

530 honor environment variables and CLI flags when determining capabilities 

531 of streams. 

532 

533 """ 

534 

535 is_unicode = stream_is_unicode(ostream) 

536 

537 if ( 

538 # For building docs in github. 

539 "YUIO_FORCE_FULL_TERM_SUPPORT" in os.environ and istream is not None 

540 ): # pragma: no cover 

541 return Term( 

542 ostream=ostream, 

543 istream=istream, 

544 color_support=ColorSupport.ANSI_TRUE, 

545 ostream_is_tty=True, 

546 istream_is_tty=True, 

547 is_unicode=is_unicode, 

548 ) 

549 

550 _prepare_tty() 

551 

552 output_is_tty = _output_is_tty(ostream) 

553 input_is_tty = _input_is_tty(istream) 

554 in_ci = detect_ci() 

555 

556 # Detect colors. 

557 if output_is_tty or (_EXPLICIT_COLOR_SUPPORT is not None and allow_env_overrides): 

558 color_support = _COLOR_SUPPORT 

559 else: 

560 color_support = ColorSupport.NONE 

561 

562 if istream is None: 

563 istream = _ClosedIO() 

564 

565 return Term( 

566 ostream=ostream, 

567 istream=istream, 

568 color_support=color_support, 

569 ostream_is_tty=output_is_tty and not in_ci, 

570 istream_is_tty=input_is_tty and not in_ci, 

571 terminal_theme=_TERMINAL_THEME, 

572 is_unicode=is_unicode, 

573 ) 

574 

575 

576def _prepare_tty(): 

577 if not _TTY_SETUP_PERFORMED: 

578 with _LOCK: 

579 if not _TTY_SETUP_PERFORMED: 

580 _do_prepare_tty() 

581 

582 

583def _do_prepare_tty(): 

584 global \ 

585 _TTY_SETUP_PERFORMED, \ 

586 _TERMINAL_THEME, \ 

587 _EXPLICIT_COLOR_SUPPORT, \ 

588 _COLOR_SUPPORT 

589 

590 _find_tty() 

591 

592 _TTY_SETUP_PERFORMED = True 

593 

594 # Theme is `None` for now, will query it later. 

595 _TERMINAL_THEME = None 

596 

597 # Find out if user specified `--color` or `FORCE_COLOR`. 

598 _EXPLICIT_COLOR_SUPPORT = _detect_explicit_color_settings() 

599 

600 # Check user preferences. 

601 if _EXPLICIT_COLOR_SUPPORT is False: 

602 # Colors disabled, nothing more to do. 

603 _COLOR_SUPPORT = ColorSupport.NONE 

604 return 

605 elif _EXPLICIT_COLOR_SUPPORT is True: 

606 # At least ANSI. Might improve later. 

607 _COLOR_SUPPORT = max(ColorSupport.ANSI, _detect_color_support_from_env()) 

608 elif _EXPLICIT_COLOR_SUPPORT is None: 

609 # At least NONE. Might improve later. 

610 _COLOR_SUPPORT = _detect_color_support_from_env() 

611 else: 

612 # Exact color support is given. 

613 _COLOR_SUPPORT = _EXPLICIT_COLOR_SUPPORT 

614 

615 if _TTY_OUTPUT is None: 

616 # Can't find attached TTY output, hence can't improve color support. 

617 return 

618 

619 if os.name == "nt": 

620 # Try enabling true colors. 

621 if _enable_vt_processing(_TTY_OUTPUT): 

622 # Success, can improve color support. 

623 if _EXPLICIT_COLOR_SUPPORT is None or _EXPLICIT_COLOR_SUPPORT is True: 

624 _COLOR_SUPPORT = ColorSupport.ANSI_TRUE 

625 else: 

626 # Failure, this version of Windows does not support colors. 

627 return 

628 

629 if _TTY_INPUT is None: 

630 # Can't find attached TTY input, hence can't improve color support. 

631 return 

632 

633 if not _is_foreground(_TTY_OUTPUT) or not _is_foreground(_TTY_INPUT): 

634 # We're not a foreground process, we won't be able to fetch colors. 

635 return 

636 if detect_ci(): 

637 # We're in CI, we won't be able to fetch colors. 

638 return 

639 if not _is_tty(__stdin): 

640 # We don't want to query colors if our stdin is redirected: this is a sign 

641 # that this program runs in some sort of a pipeline, and multiple instances 

642 # of it might run at the same time. If this happens, several processes/threads 

643 # can interact with the same TTY, leading to garbled output. 

644 return 

645 

646 if _COLOR_SUPPORT >= ColorSupport.ANSI: 

647 # We were able to find TTY, and colors are supported. 

648 # Try fetching terminal theme. 

649 _TERMINAL_THEME = _get_standard_colors(_TTY_OUTPUT, _TTY_INPUT) 

650 

651 

652def _find_tty(): 

653 global _TTY_OUTPUT, _TTY_INPUT 

654 

655 _TTY_OUTPUT = _TTY_INPUT = None 

656 

657 closer = contextlib.ExitStack() 

658 try: 

659 if os.name == "nt": 

660 file_io_in = io._WindowsConsoleIO("CONIN$", "r") # type: ignore 

661 tty_in = closer.enter_context( 

662 io.TextIOWrapper(file_io_in, encoding="utf-8") 

663 ) 

664 file_io_out = io._WindowsConsoleIO("CONOUT$", "w") # type: ignore 

665 tty_out = closer.enter_context( 

666 io.TextIOWrapper(file_io_out, encoding="utf-8") 

667 ) 

668 else: 

669 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) 

670 file_io = io.FileIO(fd, "w+") 

671 file_io.name = "/dev/tty" 

672 tty_in = tty_out = closer.enter_context(io.TextIOWrapper(file_io)) 

673 except (OSError, AttributeError): 

674 closer.close() 

675 except: 

676 closer.close() 

677 raise 

678 else: 

679 atexit.register(closer.close) 

680 _TTY_INPUT = tty_in 

681 _TTY_OUTPUT = tty_out 

682 return 

683 

684 for stream in (__stderr, __stdout): 

685 if stream is not None and _output_is_tty(stream): 

686 _TTY_OUTPUT = stream 

687 break 

688 if __stdin is not None and _input_is_tty(__stdin): 

689 _TTY_INPUT = __stdin 

690 

691 

692def _get_standard_colors( 

693 ostream: _t.TextIO, istream: _t.TextIO 

694) -> TerminalTheme | None: 

695 if "YUIO_DISABLE_OSC_QUERIES" in os.environ: 

696 return None 

697 

698 try: 

699 query = "\x1b]10;?\x1b\\\x1b]11;?\x1b\\" + "".join( 

700 [f"\x1b]4;{i};?\x1b\\" for i in range(16)] 

701 ) 

702 response = _query_term(ostream, istream, query) 

703 if not response: 

704 return None 

705 

706 # Deal with foreground color. 

707 

708 match = re.match( 

709 r"^\x1b]10;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

710 response, 

711 re.IGNORECASE, 

712 ) 

713 if match is None: # pragma: no cover 

714 return None 

715 

716 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()) 

717 foreground = yuio.color.ColorValue.from_rgb(r, g, b) 

718 

719 response = response[match.end() :] 

720 

721 # Deal with background color. 

722 

723 match = re.match( 

724 r"^\x1b]11;rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

725 response, 

726 re.IGNORECASE, 

727 ) 

728 if match is None: # pragma: no cover 

729 return None 

730 

731 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()) 

732 background = yuio.color.ColorValue.from_rgb(r, g, b) 

733 luma = (0.2627 * r + 0.6780 * g + 0.0593 * b) / 256 

734 

735 if luma <= 0.2: 

736 lightness = Lightness.DARK 

737 elif luma >= 0.85: 

738 lightness = Lightness.LIGHT 

739 else: 

740 lightness = Lightness.UNKNOWN 

741 

742 response = response[match.end() :] 

743 

744 # Deal with other colors 

745 

746 colors = {} 

747 

748 while response: 

749 match = re.match( 

750 r"^\x1b]4;(\d+);rgb:([0-9a-f]{2,4})/([0-9a-f]{2,4})/([0-9a-f]{2,4})(?:\x1b\\|\a)", 

751 response, 

752 re.IGNORECASE, 

753 ) 

754 if match is None: # pragma: no cover 

755 return None 

756 

757 c = int(match.group(1)) 

758 r, g, b = (int(v, 16) // 16 ** (len(v) - 2) for v in match.groups()[1:]) 

759 colors[c] = yuio.color.ColorValue.from_rgb(r, g, b) 

760 

761 response = response[match.end() :] 

762 

763 for k in range(8): 

764 if k not in colors: 

765 return None 

766 

767 # return colors 

768 return TerminalTheme( 

769 background=background, 

770 foreground=foreground, 

771 black=colors[0], 

772 bright_black=colors.get(8 + 0, colors[0]), 

773 red=colors[1], 

774 bright_red=colors.get(8 + 1, colors[1]), 

775 green=colors[2], 

776 bright_green=colors.get(8 + 2, colors[2]), 

777 yellow=colors[3], 

778 bright_yellow=colors.get(8 + 3, colors[3]), 

779 blue=colors[4], 

780 bright_blue=colors.get(8 + 4, colors[4]), 

781 magenta=colors[5], 

782 bright_magenta=colors.get(8 + 5, colors[5]), 

783 cyan=colors[6], 

784 bright_cyan=colors.get(8 + 6, colors[6]), 

785 white=colors[7], 

786 bright_white=colors.get(8 + 7, colors[7]), 

787 lightness=lightness, 

788 ) 

789 

790 except Exception: # pragma: no cover 

791 return None 

792 

793 

794def _query_term(ostream: _t.TextIO, istream: _t.TextIO, query: str) -> str | None: 

795 try: 

796 # Lock the keyboard. 

797 ostream.write("\x1b[2h") 

798 ostream.flush() 

799 

800 with _enter_raw_mode(ostream, istream): 

801 _flush_input_buffer(ostream, istream) 

802 

803 # It is important that we unlock keyboard before exiting `cbreak`, 

804 # hence the nested `try`. 

805 try: 

806 # Append a DA1 query, as virtually all terminals support it. 

807 ostream.write(query + "\x1b[c") 

808 ostream.flush() 

809 

810 buf = _read_keycode(ostream, istream, timeout=0.250) 

811 if not buf.startswith("\x1b"): 

812 yuio._logger.warning("_query_term invalid response: %r", buf) 

813 return None 

814 

815 # Read till we find a DA1 response. 

816 while not re.search(r"\x1b\[\?.*?c", buf): 

817 buf += _read_keycode(ostream, istream, timeout=0.250) 

818 

819 return buf[: buf.index("\x1b[?")] 

820 finally: 

821 _flush_input_buffer(ostream, istream) 

822 

823 # Release the keyboard. 

824 ostream.write("\x1b[2i") 

825 ostream.flush() 

826 except Exception: # pragma: no cover 

827 yuio._logger.warning("_query_term error", exc_info=True) 

828 return None 

829 finally: 

830 # Release the keyboard. 

831 ostream.write("\x1b[2i") 

832 ostream.flush() 

833 

834 

835def _detect_explicit_color_settings() -> ColorSupport | bool | None: 

836 color_support = None 

837 

838 if "FORCE_COLOR" in os.environ: 

839 color_support = True 

840 

841 if "NO_COLOR" in os.environ or "FORCE_NO_COLOR" in os.environ: 

842 color_support = False 

843 

844 # Note: we don't rely on argparse to parse flags and send them to us 

845 # because these functions can be called before parsing arguments. 

846 for arg in sys.argv[1:]: 

847 if arg in ("--color", "--force-color"): 

848 color_support = True 

849 elif arg in ("--no-color", "--force-no-color"): 

850 color_support = False 

851 elif arg.startswith(("--color=", "--colors=")): 

852 value = ( 

853 arg.split("=", maxsplit=1)[1] 

854 .replace("_", "") 

855 .replace("-", "") 

856 .casefold() 

857 ) 

858 if value in ["1", "yes", "true"]: 

859 color_support = True 

860 elif value in ["0", "no", "false"]: 

861 color_support = False 

862 elif value == "ansi": 

863 color_support = ColorSupport.ANSI 

864 elif value == "ansi256": 

865 color_support = ColorSupport.ANSI_256 

866 elif value == "ansitrue": 

867 color_support = ColorSupport.ANSI_TRUE 

868 

869 return color_support 

870 

871 

872def _detect_color_support_from_env() -> ColorSupport: 

873 term = os.environ.get("TERM", "").lower() 

874 colorterm = os.environ.get("COLORTERM", "").lower() 

875 

876 if detect_ci(): 

877 return detect_ci_color_support() 

878 elif os.name == "nt": 

879 return ColorSupport.NONE 

880 elif colorterm in ("truecolor", "24bit") or term == "xterm-kitty": 

881 return ColorSupport.ANSI_TRUE 

882 elif colorterm in ("yes", "true") or "256color" in term or term == "screen": 

883 if os.name == "posix" and term == "xterm-256color" and shutil.which("wslinfo"): 

884 return ColorSupport.ANSI_TRUE 

885 else: 

886 return ColorSupport.ANSI_256 

887 elif "linux" in term or "color" in term or "ansi" in term or "xterm" in term: 

888 return ColorSupport.ANSI 

889 

890 return ColorSupport.NONE 

891 

892 

893def _is_tty(stream: _t.TextIO | None) -> bool: 

894 try: 

895 return stream is not None and stream.isatty() 

896 except Exception: # pragma: no cover 

897 return False 

898 

899 

900def _input_is_tty(stream: _t.TextIO | None) -> bool: 

901 try: 

902 return stream is not None and _is_tty(stream) and stream.readable() 

903 except Exception: # pragma: no cover 

904 return False 

905 

906 

907def _output_is_tty(stream: _t.TextIO | None) -> bool: 

908 try: 

909 return stream is not None and _is_tty(stream) and stream.writable() 

910 except Exception: # pragma: no cover 

911 return False 

912 

913 

914@contextlib.contextmanager 

915def _modify_keyboard( 

916 ostream: _t.TextIO, 

917 bracketed_paste: bool = False, 

918 modify_keyboard: bool = False, 

919): 

920 prologue = [] 

921 if bracketed_paste: 

922 prologue.append("\x1b[?2004h") 

923 if modify_keyboard: 

924 prologue.append("\x1b[>1u") 

925 if prologue: 

926 ostream.write("".join(prologue)) 

927 ostream.flush() 

928 try: 

929 yield 

930 finally: 

931 epilog = [] 

932 if bracketed_paste: 

933 epilog.append("\x1b[?2004l") 

934 epilog.append("\x1b[<u") 

935 if epilog: 

936 ostream.write("".join(epilog)) 

937 ostream.flush() 

938 

939 

940# Platform-specific code for working with terminals. 

941if os.name == "posix": 

942 import select 

943 import signal 

944 import termios 

945 import tty 

946 

947 def _is_foreground(stream: _t.TextIO | None) -> bool: 

948 try: 

949 return stream is not None and os.getpgrp() == os.tcgetpgrp(stream.fileno()) 

950 except Exception: # pragma: no cover 

951 return False 

952 

953 @contextlib.contextmanager 

954 def _enter_raw_mode( 

955 ostream: _t.TextIO, 

956 istream: _t.TextIO, 

957 bracketed_paste: bool = False, 

958 modify_keyboard: bool = False, 

959 ): 

960 prev_mode = termios.tcgetattr(istream) 

961 new_mode = prev_mode.copy() 

962 new_mode[tty.LFLAG] &= ~( 

963 termios.ECHO # Don't print back what user types. 

964 | termios.ICANON # Disable line editing. 

965 | termios.ISIG # Disable signals on C-c and C-z. 

966 ) 

967 new_mode[tty.CC] = new_mode[tty.CC].copy() 

968 new_mode[tty.CC][termios.VMIN] = 1 

969 new_mode[tty.CC][termios.VTIME] = 0 

970 termios.tcsetattr(istream, termios.TCSAFLUSH, new_mode) 

971 

972 try: 

973 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard): 

974 yield 

975 finally: 

976 termios.tcsetattr(istream, termios.TCSAFLUSH, prev_mode) 

977 

978 def _read_keycode( 

979 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

980 ) -> str: 

981 if timeout and not bool(select.select([istream], [], [], timeout)[0]): 

982 raise TimeoutError() 

983 key = os.read(istream.fileno(), 128) 

984 while bool(select.select([istream], [], [], 0)[0]): 

985 key += os.read(istream.fileno(), 128) 

986 

987 return key.decode(istream.encoding, errors="replace") 

988 

989 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

990 pass 

991 

992 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

993 return False # This is a windows functionality 

994 

995 def _pause(): 

996 os.kill(os.getpid(), signal.SIGTSTP) 

997 

998elif os.name == "nt": 

999 import ctypes 

1000 import ctypes.wintypes 

1001 import msvcrt 

1002 

1003 _FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer 

1004 _FlushConsoleInputBuffer.argtypes = [ctypes.wintypes.HANDLE] 

1005 _FlushConsoleInputBuffer.restype = ctypes.wintypes.BOOL 

1006 

1007 _GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode 

1008 _GetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD] 

1009 _GetConsoleMode.restype = ctypes.wintypes.BOOL 

1010 

1011 _SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode 

1012 _SetConsoleMode.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD] 

1013 _SetConsoleMode.restype = ctypes.wintypes.BOOL 

1014 

1015 _GetConsoleWindow = ctypes.windll.kernel32.GetConsoleWindow 

1016 _GetConsoleWindow.argtypes = [] 

1017 _GetConsoleWindow.restype = ctypes.wintypes.HWND 

1018 

1019 _IsWindowVisible = ctypes.windll.user32.IsWindowVisible 

1020 _IsWindowVisible.argtypes = [ctypes.wintypes.HWND] 

1021 _SetConsoleMode.restype = ctypes.wintypes.BOOL 

1022 

1023 _ReadConsoleW = ctypes.windll.kernel32.ReadConsoleW 

1024 _ReadConsoleW.argtypes = [ 

1025 ctypes.wintypes.HANDLE, 

1026 ctypes.wintypes.LPVOID, 

1027 ctypes.wintypes.DWORD, 

1028 ctypes.wintypes.LPDWORD, 

1029 ctypes.wintypes.LPVOID, 

1030 ] 

1031 _ReadConsoleW.restype = ctypes.wintypes.BOOL 

1032 

1033 _ENABLE_PROCESSED_OUTPUT = 0x0001 

1034 _ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002 

1035 _ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 

1036 _ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200 

1037 

1038 _ISTREAM_HANDLE = None 

1039 

1040 def _is_foreground(stream: _t.TextIO | None) -> bool: 

1041 window = _GetConsoleWindow() 

1042 if not window: 

1043 return False 

1044 return _IsWindowVisible(window) 

1045 

1046 @contextlib.contextmanager 

1047 def _enter_raw_mode( 

1048 ostream: _t.TextIO, 

1049 istream: _t.TextIO, 

1050 bracketed_paste: bool = False, 

1051 modify_keyboard: bool = False, 

1052 ): 

1053 global _ISTREAM_HANDLE 

1054 

1055 if _ISTREAM_HANDLE is None: 

1056 _prepare_tty() 

1057 _ISTREAM_HANDLE = msvcrt.get_osfhandle((_TTY_INPUT or __stdin).fileno()) # type: ignore 

1058 

1059 mode = ctypes.wintypes.DWORD() 

1060 success = _GetConsoleMode(_ISTREAM_HANDLE, ctypes.byref(mode)) 

1061 if not success: 

1062 raise ctypes.WinError() 

1063 success = _SetConsoleMode(_ISTREAM_HANDLE, _ENABLE_VIRTUAL_TERMINAL_INPUT) 

1064 if not success: 

1065 raise ctypes.WinError() 

1066 

1067 try: 

1068 with _modify_keyboard(ostream, bracketed_paste, modify_keyboard): 

1069 yield 

1070 finally: 

1071 success = _SetConsoleMode(_ISTREAM_HANDLE, mode) 

1072 if not success: 

1073 raise ctypes.WinError() 

1074 

1075 def _read_keycode( 

1076 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

1077 ) -> str: 

1078 assert _ISTREAM_HANDLE is not None 

1079 

1080 CHAR16 = ctypes.wintypes.WCHAR * 16 

1081 

1082 n_read = ctypes.wintypes.DWORD() 

1083 buffer = CHAR16() 

1084 

1085 success = _ReadConsoleW( 

1086 _ISTREAM_HANDLE, 

1087 ctypes.byref(buffer), 

1088 16, 

1089 ctypes.byref(n_read), 

1090 0, 

1091 ) 

1092 if not success: 

1093 raise ctypes.WinError() 

1094 

1095 return buffer.value 

1096 

1097 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

1098 assert _ISTREAM_HANDLE is not None 

1099 

1100 success = _FlushConsoleInputBuffer(_ISTREAM_HANDLE) 

1101 if not success: 

1102 raise ctypes.WinError() 

1103 

1104 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

1105 try: 

1106 version = sys.getwindowsversion() 

1107 if version.major < 10 or version.build < 14931: 

1108 return False 

1109 

1110 handle = msvcrt.get_osfhandle(ostream.fileno()) 

1111 return bool( 

1112 _SetConsoleMode( 

1113 handle, 

1114 _ENABLE_PROCESSED_OUTPUT 

1115 | _ENABLE_WRAP_AT_EOL_OUTPUT 

1116 | _ENABLE_VIRTUAL_TERMINAL_PROCESSING, 

1117 ) 

1118 ) 

1119 except Exception: # pragma: no cover 

1120 return False 

1121 

1122 def _pause(): 

1123 pass 

1124 

1125else: # pragma: no cover 

1126 

1127 def _is_foreground(stream: _t.TextIO | None) -> bool: 

1128 return False 

1129 

1130 @contextlib.contextmanager 

1131 def _enter_raw_mode( 

1132 ostream: _t.TextIO, 

1133 istream: _t.TextIO, 

1134 bracketed_paste: bool = False, 

1135 modify_keyboard: bool = False, 

1136 ): 

1137 raise OSError("not supported") 

1138 yield 

1139 

1140 def _read_keycode( 

1141 ostream: _t.TextIO, istream: _t.TextIO, timeout: float = 0 

1142 ) -> str: 

1143 raise OSError("not supported") 

1144 

1145 def _flush_input_buffer(ostream: _t.TextIO, istream: _t.TextIO): 

1146 raise OSError("not supported") 

1147 

1148 def _enable_vt_processing(ostream: _t.TextIO) -> bool: 

1149 raise OSError("not supported") 

1150 

1151 def _pause(): 

1152 raise OSError("not supported")