Coverage for yuio / exec.py: 87%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 10:05 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-04 10:05 +0000
1# Yuio project, MIT license.
2#
3# https://github.com/taminomara/yuio/
4#
5# You're free to copy this file to your project and edit it for your needs,
6# just keep this copyright line please :3
8"""
9This module provides helpers to run subprocesses and get their output.
11It handles subprocess' stderr and stdout in a way that doesn't break
12loggers from :mod:`yuio.io`.
14.. autofunction:: exec
16.. autoclass:: ExecError
18"""
20from __future__ import annotations
22import contextlib
23import logging
24import os
25import pathlib
26import select
27import selectors
28import subprocess
29import threading
31from yuio import _typing as _t
33__all__ = [
34 "ExecError",
35 "exec",
36]
38_logger = logging.getLogger(__name__)
41class ExecError(subprocess.CalledProcessError):
42 """
43 Raised when executed command returns a non-zero status.
45 .. py:data:: returncode
46 :type: int
48 Return code of the called command.
50 .. py:data:: cmd
51 :type: tuple[str | pathlib.Path, ...]
53 Initial ``args`` passed to the :func:`exec`.
55 .. py:data:: output
56 stdout
57 :type: str | bytes | None
59 Captured stdout.
61 .. py:data:: stderr
62 :type: str | bytes | None
64 Captured stderr.
66 """
68 def __str__(self):
69 res = super().__str__()
70 if stderr := getattr(self, "stderr", None):
71 if isinstance(stderr, bytes):
72 stderr = stderr.decode(errors="replace")
73 res += "\n\nStderr:\n" + stderr
74 return res
77@_t.overload
78def exec(
79 *args: str | pathlib.Path,
80 cwd: str | pathlib.Path | None = None,
81 env: dict[str, str] | None = None,
82 input: str | None = None,
83 capture_io: _t.Literal[True] = True,
84 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
85 level: int = logging.DEBUG,
86 text: _t.Literal[True] = True,
87) -> str: ...
88@_t.overload
89def exec(
90 *args: str | pathlib.Path,
91 cwd: str | pathlib.Path | None = None,
92 env: dict[str, str] | None = None,
93 input: bytes | None = None,
94 capture_io: _t.Literal[True] = True,
95 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
96 level: int = logging.DEBUG,
97 text: _t.Literal[False],
98) -> bytes: ...
99@_t.overload
100def exec(
101 *args: str | pathlib.Path,
102 cwd: str | pathlib.Path | None = None,
103 env: dict[str, str] | None = None,
104 input: str | None = None,
105 capture_io: _t.Literal[False],
106 text: _t.Literal[True] = True,
107) -> None: ...
108@_t.overload
109def exec(
110 *args: str | pathlib.Path,
111 cwd: str | pathlib.Path | None = None,
112 env: dict[str, str] | None = None,
113 input: bytes | None = None,
114 capture_io: _t.Literal[False],
115 text: _t.Literal[False],
116) -> None: ...
117@_t.overload
118def exec(
119 *args: str | pathlib.Path,
120 cwd: None | str | pathlib.Path = None,
121 env: dict[str, str] | None = None,
122 capture_io: bool = True,
123 input: None | str | bytes = None,
124 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
125 level: int | None = None,
126 text: bool = False,
127) -> str | bytes | None: ...
128def exec(
129 *args: str | pathlib.Path,
130 cwd: None | str | pathlib.Path = None,
131 env: dict[str, str] | None = None,
132 capture_io: bool = True,
133 input: None | str | bytes = None,
134 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None,
135 level: int | None = None,
136 text: bool = True,
137) -> str | bytes | None:
138 """
139 Run an executable and return its stdout.
141 Command's stderr is interactively printed to the log.
143 :param args:
144 command arguments.
145 :param cwd:
146 set the current directory before the command is executed.
147 :param env:
148 define the environment variables for the command.
149 :param input:
150 string with command's stdin. If ``text`` is set to :data:`False`, this should
151 be :class:`bytes`, otherwise it should be a :class:`str`.
152 :param capture_io:
153 if set to :data:`False`, process' stdout and stderr are not captured;
154 ``logger`` and ``level`` arguments can't be given in this case, and this
155 function returns :data:`None` instead of process' output.
156 :param logger:
157 logger that will be used for logging command's output. Default is to log
158 to ``yuio.exec``.
159 :param level:
160 logging level for stderr outputs. Default is :data:`logging.DEBUG`.
161 :param text:
162 if set to :data:`False`, stdout is returned as :class:`bytes`.
163 :returns:
164 string (or bytes) with command's stdout, or :data:`None` if ``capture_io``
165 is :data:`False`.
166 :raises:
167 If the command fails, a :class:`~subprocess.CalledProcessError` is raised.
168 If command can't be started, raises :class:`OSError`.
170 """
172 if not capture_io:
173 for name, param in [
174 ("logger", logger),
175 ("level", level),
176 ]:
177 if param is not None:
178 raise ValueError(f"{name} can't be specified when capture_io is False")
180 level = level if level is not None else logging.DEBUG
182 if logger is None:
183 logger = _logger
184 elif isinstance(logger, str):
185 logger = logging.getLogger(logger)
187 logger.log(level, " ".join(map(str, args)))
189 with contextlib.ExitStack() as s:
190 if not capture_io:
191 import yuio.io
193 s.enter_context(yuio.io.SuspendOutput())
195 process = s.enter_context(
196 subprocess.Popen(
197 args,
198 cwd=cwd,
199 env=env,
200 stdout=subprocess.PIPE if capture_io else None,
201 stderr=subprocess.PIPE if capture_io else None,
202 text=text,
203 stdin=(
204 (subprocess.DEVNULL if input is None else subprocess.PIPE)
205 if capture_io
206 else None
207 ),
208 )
209 )
211 stdout, stderr = _process_io(process, capture_io, input, logger, level)
213 process.wait()
215 if not capture_io:
216 stdout_str = None
217 elif text:
218 stdout_str = "".join(_t.cast(list[str], stdout))
219 else:
220 stdout_str = b"".join(_t.cast(list[bytes], stdout))
222 if process.returncode != 0:
223 if not capture_io:
224 stderr_str = None
225 elif text:
226 stderr_str = "".join(_t.cast(list[str], stderr))
227 else:
228 stderr_str = b"".join(_t.cast(list[bytes], stderr))
230 raise ExecError(
231 process.returncode, args, output=stdout_str, stderr=stderr_str
232 )
234 return stdout_str
237def _process_io(
238 process: subprocess.Popen[_t.Any],
239 capture_io: bool,
240 input: str | bytes | None,
241 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
242 level: int,
243):
244 if not capture_io:
245 return _process_io_nocap(process, input, logger, level)
246 elif os.name == "nt":
247 return _process_io_threads(process, input, logger, level)
248 else:
249 return _process_io_selectors(process, input, logger, level)
252def _process_io_threads(
253 process: subprocess.Popen[_t.Any],
254 input: str | bytes | None,
255 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
256 level: int,
257):
258 stdout = []
259 stderr = []
261 def read_stderr(fh: _t.IO[_t.Any]):
262 last_line = ""
263 while True:
264 text = fh.read()
265 if not text:
266 fh.close()
267 return
268 stderr.append(text)
269 if isinstance(text, bytes):
270 text = text.decode(errors="replace")
271 for line in text.splitlines(keepends=True):
272 if not line.endswith("\n"):
273 last_line += line
274 else:
275 logger.log(level, "-> %s", last_line + line.rstrip("\r\n"))
276 last_line = ""
278 def read_stdout(fh: _t.IO[_t.Any]):
279 while True:
280 text = fh.read()
281 if not text:
282 fh.close()
283 return
284 stdout.append(text)
286 assert process.stdout
287 stdout_thread = threading.Thread(
288 target=read_stdout,
289 args=(process.stdout,),
290 name="yuio stdout handler for sub-process",
291 )
292 stdout_thread.daemon = True
293 stdout_thread.start()
295 assert process.stderr
296 stderr_thread = threading.Thread(
297 target=read_stderr,
298 args=(process.stderr,),
299 name="yuio stderr handler for sub-process",
300 )
301 stderr_thread.daemon = True
302 stderr_thread.start()
304 if input is not None:
305 assert process.stdin is not None
306 try:
307 process.stdin.write(input)
308 process.stdin.flush()
309 except BrokenPipeError:
310 pass
311 process.stdin.close()
313 stdout_thread.join()
314 stderr_thread.join()
316 return stdout, stderr
319# From subprocess implementation: "poll/select have the advantage of not requiring
320# any extra file descriptor, contrarily to epoll/kqueue (also, they require a single
321# syscall)."
322if hasattr(selectors, "PollSelector"):
323 _Selector = selectors.PollSelector
324else:
325 _Selector = selectors.SelectSelector
328def _process_io_selectors(
329 process: subprocess.Popen[_t.Any],
330 input: str | bytes | None,
331 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
332 level: int,
333):
334 assert process.stdout
335 assert process.stderr
336 if input is not None:
337 assert process.stdin
338 if process.text_mode: # type: ignore
339 input = input.encode(process.stdin.encoding, process.stdin.errors) # type: ignore
340 input_data = memoryview(input) # type: ignore
341 else:
342 input_data = None
344 stdout: list[bytes] = []
345 stderr: list[bytes] = []
347 last_line = ""
349 def read_stderr(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
350 nonlocal last_line
351 text = os.read(fd, 32 * 1024)
352 if not text:
353 selector.unregister(fd)
354 fh.close()
355 return
356 stderr.append(text)
357 if isinstance(text, bytes):
358 text = text.decode(errors="replace")
359 for line in text.splitlines(keepends=True):
360 if not line.endswith("\n"):
361 last_line += line
362 else:
363 logger.log(level, "-> %s", last_line + line.rstrip("\r\n"))
364 last_line = ""
366 def read_stdout(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
367 text = os.read(fd, 32 * 1024)
368 if not text:
369 selector.unregister(fd)
370 fh.close()
371 return
372 stdout.append(text)
374 index = 0
376 def write_stdin(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]):
377 nonlocal index
378 assert input_data is not None
379 try:
380 index += os.write(fd, input_data[index : index + select.PIPE_BUF])
381 except BrokenPipeError:
382 selector.unregister(fd)
383 fh.close()
384 return
385 if index >= len(input_data):
386 selector.unregister(fd)
387 fh.close()
389 with _Selector() as selector:
390 selector.register(process.stderr, selectors.EVENT_READ, read_stderr)
391 selector.register(process.stdout, selectors.EVENT_READ, read_stdout)
392 if process.stdin is not None:
393 selector.register(process.stdin, selectors.EVENT_WRITE, write_stdin)
394 try:
395 process.stdin.flush()
396 except BrokenPipeError:
397 pass
399 while selector.get_map():
400 for key, _ in selector.select():
401 key.data(selector, key.fd, key.fileobj)
403 if process.stdin is not None:
404 assert process.stdin.closed
406 return _decode(process, stdout, process.stdout), _decode(
407 process, stderr, process.stderr
408 )
411def _decode(
412 process: subprocess.Popen[_t.Any], lines: list[bytes], stream: _t.IO[_t.Any]
413):
414 if not process.text_mode: # type: ignore
415 return lines
416 raw = b"".join(lines)
417 text = raw.decode(stream.encoding, stream.errors) # type: ignore
418 return [text.replace("\r\n", "\n").replace("\r", "\n")]
421def _process_io_nocap(
422 process: subprocess.Popen[_t.Any],
423 input: str | bytes | None,
424 logger: logging.Logger | logging.LoggerAdapter[_t.Any],
425 level: int,
426):
427 if input is not None:
428 assert process.stdin is not None
429 try:
430 process.stdin.write(input)
431 process.stdin.flush()
432 except BrokenPipeError:
433 pass
434 process.stdin.close()
436 return None, None