Coverage for yuio / exec.py: 87%
181 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-05 11:41 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9This module provides helpers to run subprocesses and get their output.
11It handles subprocess' stderr and stdout in a way that doesn't break
12loggers from :mod:`yuio.io`.
14.. autofunction:: exec
16.. autoclass:: ExecError
18"""
20from __future__ import annotations
22import contextlib
23import logging
24import os
25import pathlib
26import select
27import selectors
28import subprocess
29import threading
31from typing import TYPE_CHECKING
33if TYPE_CHECKING:
34 import typing_extensions as _t
35else:
36 from yuio import _typing as _t
38__all__ = [
39 "ExecError",
40 "exec",
41]
43_logger = logging.getLogger(__name__)
46class ExecError(subprocess.CalledProcessError):
47 """
48 Raised when executed command returns a non-zero status.
50 .. py:data:: returncode
51 :type: int
53 Return code of the called command.
55 .. py:data:: cmd
56 :type: tuple[str | pathlib.Path, ...]
58 Initial `args` passed to the :func:`exec`.
60 .. py:data:: output
61 stdout
62 :type: str | bytes | None
64 Captured stdout.
66 .. py:data:: stderr
67 :type: str | bytes | None
69 Captured stderr.
71 """
73 def __str__(self):
74 res = super().__str__()
75 if stderr := getattr(self, "stderr", None):
76 if isinstance(stderr, bytes):
77 stderr = stderr.decode(errors="replace")
78 res += "\n\nStderr:\n" + stderr
79 return res
82@_t.overload
83def exec(
84 *args: str | pathlib.Path,
85 cwd: str | pathlib.Path | None = None,
86 env: dict[str, str] | None = None,
87 input: str | None = None,
88 capture_io: _t.Literal[True] = True,
89 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
90 level: int = logging.DEBUG,
91 text: _t.Literal[True] = True,
92) -> str: ...
93@_t.overload
94def exec(
95 *args: str | pathlib.Path,
96 cwd: str | pathlib.Path | None = None,
97 env: dict[str, str] | None = None,
98 input: bytes | None = None,
99 capture_io: _t.Literal[True] = True,
100 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
101 level: int = logging.DEBUG,
102 text: _t.Literal[False],
103) -> bytes: ...
104@_t.overload
105def exec(
106 *args: str | pathlib.Path,
107 cwd: str | pathlib.Path | None = None,
108 env: dict[str, str] | None = None,
109 input: str | None = None,
110 capture_io: _t.Literal[False],
111 text: _t.Literal[True] = True,
112) -> None: ...
113@_t.overload
114def exec(
115 *args: str | pathlib.Path,
116 cwd: str | pathlib.Path | None = None,
117 env: dict[str, str] | None = None,
118 input: bytes | None = None,
119 capture_io: _t.Literal[False],
120 text: _t.Literal[False],
121) -> None: ...
122@_t.overload
123def exec(
124 *args: str | pathlib.Path,
125 cwd: None | str | pathlib.Path = None,
126 env: dict[str, str] | None = None,
127 capture_io: bool = True,
128 input: None | str | bytes = None,
129 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
130 level: int | None = None,
131 text: bool = False,
132) -> str | bytes | None: ...
133def exec(
134 *args: str | pathlib.Path,
135 cwd: None | str | pathlib.Path = None,
136 env: dict[str, str] | None = None,
137 capture_io: bool = True,
138 input: None | str | bytes = None,
139 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
140 level: int | None = None,
141 text: bool = True,
142) -> str | bytes | None:
143 """
144 Run an executable and return its stdout.
146 Command's stderr is interactively printed to the log.
148 :param args:
149 command arguments.
150 :param cwd:
151 set the current directory before the command is executed.
152 :param env:
153 define the environment variables for the command.
154 :param input:
155 string with command's stdin. If `text` is set to :data:`False`, this should
156 be :class:`bytes`, otherwise it should be a :class:`str`.
157 :param capture_io:
158 if set to :data:`False`, process' stdout and stderr are not captured;
159 `logger` and `level` arguments can't be given in this case, and this
160 function returns :data:`None` instead of process' output.
161 :param logger:
162 logger that will be used for logging command's output. Default is to log
163 to ``yuio.exec``.
164 :param level:
165 logging level for stderr outputs. Default is :data:`logging.DEBUG`.
166 :param text:
167 if set to :data:`False`, stdout is returned as :class:`bytes`.
168 :returns:
169 string (or bytes) with command's stdout, or :data:`None` if `capture_io`
170 is :data:`False`.
171 :raises:
172 If the command fails, a :class:`~subprocess.CalledProcessError` is raised.
173 If command can't be started, raises :class:`OSError`.
175 """
177 if not capture_io:
178 for name, param in [
179 ("logger", logger),
180 ("level", level),
181 ]:
182 if param is not None:
183 raise ValueError(f"{name} can't be specified when capture_io is False")
185 level = level if level is not None else logging.DEBUG
187 if logger is None:
188 logger = _logger
189 elif isinstance(logger, str):
190 logger = logging.getLogger(logger)
192 logger.log(level, " ".join(map(str, args)))
194 with contextlib.ExitStack() as s:
195 if not capture_io:
196 import yuio.io
198 s.enter_context(yuio.io.SuspendOutput())
200 process = s.enter_context(
201 subprocess.Popen(
202 args,
203 cwd=cwd,
204 env=env,
205 stdout=subprocess.PIPE if capture_io else None,
206 stderr=subprocess.PIPE if capture_io else None,
207 text=text,
208 stdin=(
209 (subprocess.DEVNULL if input is None else subprocess.PIPE)
210 if capture_io
211 else None
212 ),
213 )
214 )
216 stdout, stderr = _process_io(process, capture_io, input, logger, level)
218 process.wait()
220 if not capture_io:
221 stdout_str = None
222 elif text:
223 stdout_str = "".join(_t.cast(list[str], stdout))
224 else:
225 stdout_str = b"".join(_t.cast(list[bytes], stdout))
227 if process.returncode != 0:
228 if not capture_io:
229 stderr_str = None
230 elif text:
231 stderr_str = "".join(_t.cast(list[str], stderr))
232 else:
233 stderr_str = b"".join(_t.cast(list[bytes], stderr))
235 raise ExecError(
236 process.returncode, args, output=stdout_str, stderr=stderr_str
237 )
239 return stdout_str
242def _process_io(
243 process: subprocess.Popen[_t.Any],
244 capture_io: bool,
245 input: str | bytes | None,
246 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
247 level: int,
248):
249 if not capture_io:
250 return _process_io_nocap(process, input, logger, level)
251 elif os.name == "nt":
252 return _process_io_threads(process, input, logger, level)
253 else:
254 return _process_io_selectors(process, input, logger, level)
257def _process_io_threads(
258 process: subprocess.Popen[_t.Any],
259 input: str | bytes | None,
260 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
261 level: int,
262):
263 stdout = []
264 stderr = []
266 def read_stderr(fh: _t.IO[_t.Any]):
267 last_line = ""
268 while True:
269 text = fh.read()
270 if not text:
271 fh.close()
272 return
273 stderr.append(text)
274 if isinstance(text, bytes):
275 text = text.decode(errors="replace")
276 for line in text.splitlines(keepends=True):
277 if not line.endswith("\n"):
278 last_line += line
279 else:
280 logger.log(level, "-> %s", last_line + line.rstrip("\r\n"))
281 last_line = ""
283 def read_stdout(fh: _t.IO[_t.Any]):
284 while True:
285 text = fh.read()
286 if not text:
287 fh.close()
288 return
289 stdout.append(text)
291 assert process.stdout
292 stdout_thread = threading.Thread(
293 target=read_stdout,
294 args=(process.stdout,),
295 name="yuio stdout handler for sub-process",
296 )
297 stdout_thread.daemon = True
298 stdout_thread.start()
300 assert process.stderr
301 stderr_thread = threading.Thread(
302 target=read_stderr,
303 args=(process.stderr,),
304 name="yuio stderr handler for sub-process",
305 )
306 stderr_thread.daemon = True
307 stderr_thread.start()
309 if input is not None:
310 assert process.stdin is not None
311 try:
312 process.stdin.write(input)
313 process.stdin.flush()
314 except BrokenPipeError:
315 pass
316 process.stdin.close()
318 stdout_thread.join()
319 stderr_thread.join()
321 return stdout, stderr
324# From subprocess implementation: "poll/select have the advantage of not requiring
325# any extra file descriptor, contrarily to epoll/kqueue (also, they require a single
326# syscall)."
327if hasattr(selectors, "PollSelector"):
328 _Selector = selectors.PollSelector
329else:
330 _Selector = selectors.SelectSelector
333def _process_io_selectors(
334 process: subprocess.Popen[_t.Any],
335 input: str | bytes | None,
336 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
337 level: int,
338):
339 assert process.stdout
340 assert process.stderr
341 if input is not None:
342 assert process.stdin
343 if process.text_mode: # type: ignore
344 input = input.encode(process.stdin.encoding, process.stdin.errors) # type: ignore
345 input_data = memoryview(input) # type: ignore
346 else:
347 input_data = None
349 stdout: list[bytes] = []
350 stderr: list[bytes] = []
352 last_line = ""
354 def read_stderr(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
355 nonlocal last_line
356 text = os.read(fd, 32 * 1024)
357 if not text:
358 selector.unregister(fd)
359 fh.close()
360 return
361 stderr.append(text)
362 if isinstance(text, bytes):
363 text = text.decode(errors="replace")
364 for line in text.splitlines(keepends=True):
365 if not line.endswith("\n"):
366 last_line += line
367 else:
368 logger.log(level, "-> %s", last_line + line.rstrip("\r\n"))
369 last_line = ""
371 def read_stdout(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
372 text = os.read(fd, 32 * 1024)
373 if not text:
374 selector.unregister(fd)
375 fh.close()
376 return
377 stdout.append(text)
379 index = 0
381 def write_stdin(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
382 nonlocal index
383 assert input_data is not None
384 try:
385 index += os.write(fd, input_data[index : index + select.PIPE_BUF])
386 except BrokenPipeError:
387 selector.unregister(fd)
388 fh.close()
389 return
390 if index >= len(input_data):
391 selector.unregister(fd)
392 fh.close()
394 with _Selector() as selector:
395 selector.register(process.stderr, selectors.EVENT_READ, read_stderr)
396 selector.register(process.stdout, selectors.EVENT_READ, read_stdout)
397 if process.stdin is not None:
398 selector.register(process.stdin, selectors.EVENT_WRITE, write_stdin)
399 try:
400 process.stdin.flush()
401 except BrokenPipeError:
402 pass
404 while selector.get_map():
405 for key, _ in selector.select():
406 key.data(selector, key.fd, key.fileobj)
408 if process.stdin is not None:
409 assert process.stdin.closed
411 return _decode(process, stdout, process.stdout), _decode(
412 process, stderr, process.stderr
413 )
416def _decode(
417 process: subprocess.Popen[_t.Any], lines: list[bytes], stream: _t.IO[_t.Any]
418):
419 if not process.text_mode: # type: ignore
420 return lines
421 raw = b"".join(lines)
422 text = raw.decode(stream.encoding, stream.errors) # type: ignore
423 return [text.replace("\r\n", "\n").replace("\r", "\n")]
426def _process_io_nocap(
427 process: subprocess.Popen[_t.Any],
428 input: str | bytes | None,
429 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
430 level: int,
431):
432 if input is not None:
433 assert process.stdin is not None
434 try:
435 process.stdin.write(input)
436 process.stdin.flush()
437 except BrokenPipeError:
438 pass
439 process.stdin.close()
441 return None, None