Coverage for yuio / exec.py: 87%

181 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-05 11:41 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9This module provides helpers to run subprocesses and get their output. 

10 

11It handles subprocess' stderr and stdout in a way that doesn't break 

12loggers from :mod:`yuio.io`. 

13 

14.. autofunction:: exec 

15 

16.. autoclass:: ExecError 

17 

18""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import logging 

24import os 

25import pathlib 

26import select 

27import selectors 

28import subprocess 

29import threading 

30 

31from typing import TYPE_CHECKING 

32 

33if TYPE_CHECKING: 

34 import typing_extensions as _t 

35else: 

36 from yuio import _typing as _t 

37 

38__all__ = [ 

39 "ExecError", 

40 "exec", 

41] 

42 

43_logger = logging.getLogger(__name__) 

44 

45 

46class ExecError(subprocess.CalledProcessError): 

47 """ 

48 Raised when executed command returns a non-zero status. 

49 

50 .. py:data:: returncode 

51 :type: int 

52 

53 Return code of the called command. 

54 

55 .. py:data:: cmd 

56 :type: tuple[str | pathlib.Path, ...] 

57 

58 Initial `args` passed to the :func:`exec`. 

59 

60 .. py:data:: output 

61 stdout 

62 :type: str | bytes | None 

63 

64 Captured stdout. 

65 

66 .. py:data:: stderr 

67 :type: str | bytes | None 

68 

69 Captured stderr. 

70 

71 """ 

72 

73 def __str__(self): 

74 res = super().__str__() 

75 if stderr := getattr(self, "stderr", None): 

76 if isinstance(stderr, bytes): 

77 stderr = stderr.decode(errors="replace") 

78 res += "\n\nStderr:\n" + stderr 

79 return res 

80 

81 

82@_t.overload 

83def exec( 

84 *args: str | pathlib.Path, 

85 cwd: str | pathlib.Path | None = None, 

86 env: dict[str, str] | None = None, 

87 input: str | None = None, 

88 capture_io: _t.Literal[True] = True, 

89 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

90 level: int = logging.DEBUG, 

91 text: _t.Literal[True] = True, 

92) -> str: ... 

93@_t.overload 

94def exec( 

95 *args: str | pathlib.Path, 

96 cwd: str | pathlib.Path | None = None, 

97 env: dict[str, str] | None = None, 

98 input: bytes | None = None, 

99 capture_io: _t.Literal[True] = True, 

100 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

101 level: int = logging.DEBUG, 

102 text: _t.Literal[False], 

103) -> bytes: ... 

104@_t.overload 

105def exec( 

106 *args: str | pathlib.Path, 

107 cwd: str | pathlib.Path | None = None, 

108 env: dict[str, str] | None = None, 

109 input: str | None = None, 

110 capture_io: _t.Literal[False], 

111 text: _t.Literal[True] = True, 

112) -> None: ... 

113@_t.overload 

114def exec( 

115 *args: str | pathlib.Path, 

116 cwd: str | pathlib.Path | None = None, 

117 env: dict[str, str] | None = None, 

118 input: bytes | None = None, 

119 capture_io: _t.Literal[False], 

120 text: _t.Literal[False], 

121) -> None: ... 

122@_t.overload 

123def exec( 

124 *args: str | pathlib.Path, 

125 cwd: None | str | pathlib.Path = None, 

126 env: dict[str, str] | None = None, 

127 capture_io: bool = True, 

128 input: None | str | bytes = None, 

129 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

130 level: int | None = None, 

131 text: bool = False, 

132) -> str | bytes | None: ... 

133def exec( 

134 *args: str | pathlib.Path, 

135 cwd: None | str | pathlib.Path = None, 

136 env: dict[str, str] | None = None, 

137 capture_io: bool = True, 

138 input: None | str | bytes = None, 

139 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

140 level: int | None = None, 

141 text: bool = True, 

142) -> str | bytes | None: 

143 """ 

144 Run an executable and return its stdout. 

145 

146 Command's stderr is interactively printed to the log. 

147 

148 :param args: 

149 command arguments. 

150 :param cwd: 

151 set the current directory before the command is executed. 

152 :param env: 

153 define the environment variables for the command. 

154 :param input: 

155 string with command's stdin. If `text` is set to :data:`False`, this should 

156 be :class:`bytes`, otherwise it should be a :class:`str`. 

157 :param capture_io: 

158 if set to :data:`False`, process' stdout and stderr are not captured; 

159 `logger` and `level` arguments can't be given in this case, and this 

160 function returns :data:`None` instead of process' output. 

161 :param logger: 

162 logger that will be used for logging command's output. Default is to log 

163 to ``yuio.exec``. 

164 :param level: 

165 logging level for stderr outputs. Default is :data:`logging.DEBUG`. 

166 :param text: 

167 if set to :data:`False`, stdout is returned as :class:`bytes`. 

168 :returns: 

169 string (or bytes) with command's stdout, or :data:`None` if `capture_io` 

170 is :data:`False`. 

171 :raises: 

172 If the command fails, a :class:`~subprocess.CalledProcessError` is raised. 

173 If command can't be started, raises :class:`OSError`. 

174 

175 """ 

176 

177 if not capture_io: 

178 for name, param in [ 

179 ("logger", logger), 

180 ("level", level), 

181 ]: 

182 if param is not None: 

183 raise ValueError(f"{name} can't be specified when capture_io is False") 

184 

185 level = level if level is not None else logging.DEBUG 

186 

187 if logger is None: 

188 logger = _logger 

189 elif isinstance(logger, str): 

190 logger = logging.getLogger(logger) 

191 

192 logger.log(level, " ".join(map(str, args))) 

193 

194 with contextlib.ExitStack() as s: 

195 if not capture_io: 

196 import yuio.io 

197 

198 s.enter_context(yuio.io.SuspendOutput()) 

199 

200 process = s.enter_context( 

201 subprocess.Popen( 

202 args, 

203 cwd=cwd, 

204 env=env, 

205 stdout=subprocess.PIPE if capture_io else None, 

206 stderr=subprocess.PIPE if capture_io else None, 

207 text=text, 

208 stdin=( 

209 (subprocess.DEVNULL if input is None else subprocess.PIPE) 

210 if capture_io 

211 else None 

212 ), 

213 ) 

214 ) 

215 

216 stdout, stderr = _process_io(process, capture_io, input, logger, level) 

217 

218 process.wait() 

219 

220 if not capture_io: 

221 stdout_str = None 

222 elif text: 

223 stdout_str = "".join(_t.cast(list[str], stdout)) 

224 else: 

225 stdout_str = b"".join(_t.cast(list[bytes], stdout)) 

226 

227 if process.returncode != 0: 

228 if not capture_io: 

229 stderr_str = None 

230 elif text: 

231 stderr_str = "".join(_t.cast(list[str], stderr)) 

232 else: 

233 stderr_str = b"".join(_t.cast(list[bytes], stderr)) 

234 

235 raise ExecError( 

236 process.returncode, args, output=stdout_str, stderr=stderr_str 

237 ) 

238 

239 return stdout_str 

240 

241 

242def _process_io( 

243 process: subprocess.Popen[_t.Any], 

244 capture_io: bool, 

245 input: str | bytes | None, 

246 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

247 level: int, 

248): 

249 if not capture_io: 

250 return _process_io_nocap(process, input, logger, level) 

251 elif os.name == "nt": 

252 return _process_io_threads(process, input, logger, level) 

253 else: 

254 return _process_io_selectors(process, input, logger, level) 

255 

256 

257def _process_io_threads( 

258 process: subprocess.Popen[_t.Any], 

259 input: str | bytes | None, 

260 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

261 level: int, 

262): 

263 stdout = [] 

264 stderr = [] 

265 

266 def read_stderr(fh: _t.IO[_t.Any]): 

267 last_line = "" 

268 while True: 

269 text = fh.read() 

270 if not text: 

271 fh.close() 

272 return 

273 stderr.append(text) 

274 if isinstance(text, bytes): 

275 text = text.decode(errors="replace") 

276 for line in text.splitlines(keepends=True): 

277 if not line.endswith("\n"): 

278 last_line += line 

279 else: 

280 logger.log(level, "-> %s", last_line + line.rstrip("\r\n")) 

281 last_line = "" 

282 

283 def read_stdout(fh: _t.IO[_t.Any]): 

284 while True: 

285 text = fh.read() 

286 if not text: 

287 fh.close() 

288 return 

289 stdout.append(text) 

290 

291 assert process.stdout 

292 stdout_thread = threading.Thread( 

293 target=read_stdout, 

294 args=(process.stdout,), 

295 name="yuio stdout handler for sub-process", 

296 ) 

297 stdout_thread.daemon = True 

298 stdout_thread.start() 

299 

300 assert process.stderr 

301 stderr_thread = threading.Thread( 

302 target=read_stderr, 

303 args=(process.stderr,), 

304 name="yuio stderr handler for sub-process", 

305 ) 

306 stderr_thread.daemon = True 

307 stderr_thread.start() 

308 

309 if input is not None: 

310 assert process.stdin is not None 

311 try: 

312 process.stdin.write(input) 

313 process.stdin.flush() 

314 except BrokenPipeError: 

315 pass 

316 process.stdin.close() 

317 

318 stdout_thread.join() 

319 stderr_thread.join() 

320 

321 return stdout, stderr 

322 

323 

324# From subprocess implementation: "poll/select have the advantage of not requiring 

325# any extra file descriptor, contrarily to epoll/kqueue (also, they require a single 

326# syscall)." 

327if hasattr(selectors, "PollSelector"): 

328 _Selector = selectors.PollSelector 

329else: 

330 _Selector = selectors.SelectSelector 

331 

332 

333def _process_io_selectors( 

334 process: subprocess.Popen[_t.Any], 

335 input: str | bytes | None, 

336 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

337 level: int, 

338): 

339 assert process.stdout 

340 assert process.stderr 

341 if input is not None: 

342 assert process.stdin 

343 if process.text_mode: # type: ignore 

344 input = input.encode(process.stdin.encoding, process.stdin.errors) # type: ignore 

345 input_data = memoryview(input) # type: ignore 

346 else: 

347 input_data = None 

348 

349 stdout: list[bytes] = [] 

350 stderr: list[bytes] = [] 

351 

352 last_line = "" 

353 

354 def read_stderr(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

355 nonlocal last_line 

356 text = os.read(fd, 32 * 1024) 

357 if not text: 

358 selector.unregister(fd) 

359 fh.close() 

360 return 

361 stderr.append(text) 

362 if isinstance(text, bytes): 

363 text = text.decode(errors="replace") 

364 for line in text.splitlines(keepends=True): 

365 if not line.endswith("\n"): 

366 last_line += line 

367 else: 

368 logger.log(level, "-> %s", last_line + line.rstrip("\r\n")) 

369 last_line = "" 

370 

371 def read_stdout(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

372 text = os.read(fd, 32 * 1024) 

373 if not text: 

374 selector.unregister(fd) 

375 fh.close() 

376 return 

377 stdout.append(text) 

378 

379 index = 0 

380 

381 def write_stdin(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

382 nonlocal index 

383 assert input_data is not None 

384 try: 

385 index += os.write(fd, input_data[index : index + select.PIPE_BUF]) 

386 except BrokenPipeError: 

387 selector.unregister(fd) 

388 fh.close() 

389 return 

390 if index >= len(input_data): 

391 selector.unregister(fd) 

392 fh.close() 

393 

394 with _Selector() as selector: 

395 selector.register(process.stderr, selectors.EVENT_READ, read_stderr) 

396 selector.register(process.stdout, selectors.EVENT_READ, read_stdout) 

397 if process.stdin is not None: 

398 selector.register(process.stdin, selectors.EVENT_WRITE, write_stdin) 

399 try: 

400 process.stdin.flush() 

401 except BrokenPipeError: 

402 pass 

403 

404 while selector.get_map(): 

405 for key, _ in selector.select(): 

406 key.data(selector, key.fd, key.fileobj) 

407 

408 if process.stdin is not None: 

409 assert process.stdin.closed 

410 

411 return _decode(process, stdout, process.stdout), _decode( 

412 process, stderr, process.stderr 

413 ) 

414 

415 

416def _decode( 

417 process: subprocess.Popen[_t.Any], lines: list[bytes], stream: _t.IO[_t.Any] 

418): 

419 if not process.text_mode: # type: ignore 

420 return lines 

421 raw = b"".join(lines) 

422 text = raw.decode(stream.encoding, stream.errors) # type: ignore 

423 return [text.replace("\r\n", "\n").replace("\r", "\n")] 

424 

425 

426def _process_io_nocap( 

427 process: subprocess.Popen[_t.Any], 

428 input: str | bytes | None, 

429 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

430 level: int, 

431): 

432 if input is not None: 

433 assert process.stdin is not None 

434 try: 

435 process.stdin.write(input) 

436 process.stdin.flush() 

437 except BrokenPipeError: 

438 pass 

439 process.stdin.close() 

440 

441 return None, None