Coverage for yuio / exec.py: 87%

180 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-04 10:05 +0000

1# Yuio project, MIT license. 

2# 

3# https://github.com/taminomara/yuio/ 

4# 

5# You're free to copy this file to your project and edit it for your needs, 

6# just keep this copyright line please :3 

7 

8""" 

9This module provides helpers to run subprocesses and get their output. 

10 

11It handles subprocess' stderr and stdout in a way that doesn't break 

12loggers from :mod:`yuio.io`. 

13 

14.. autofunction:: exec 

15 

16.. autoclass:: ExecError 

17 

18""" 

19 

20from __future__ import annotations 

21 

22import contextlib 

23import logging 

24import os 

25import pathlib 

26import select 

27import selectors 

28import subprocess 

29import threading 

30 

31from yuio import _typing as _t 

32 

33__all__ = [ 

34 "ExecError", 

35 "exec", 

36] 

37 

38_logger = logging.getLogger(__name__) 

39 

40 

41class ExecError(subprocess.CalledProcessError): 

42 """ 

43 Raised when executed command returns a non-zero status. 

44 

45 .. py:data:: returncode 

46 :type: int 

47 

48 Return code of the called command. 

49 

50 .. py:data:: cmd 

51 :type: tuple[str | pathlib.Path, ...] 

52 

53 Initial ``args`` passed to the :func:`exec`. 

54 

55 .. py:data:: output 

56 stdout 

57 :type: str | bytes | None 

58 

59 Captured stdout. 

60 

61 .. py:data:: stderr 

62 :type: str | bytes | None 

63 

64 Captured stderr. 

65 

66 """ 

67 

68 def __str__(self): 

69 res = super().__str__() 

70 if stderr := getattr(self, "stderr", None): 

71 if isinstance(stderr, bytes): 

72 stderr = stderr.decode(errors="replace") 

73 res += "\n\nStderr:\n" + stderr 

74 return res 

75 

76 

77@_t.overload 

78def exec( 

79 *args: str | pathlib.Path, 

80 cwd: str | pathlib.Path | None = None, 

81 env: dict[str, str] | None = None, 

82 input: str | None = None, 

83 capture_io: _t.Literal[True] = True, 

84 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

85 level: int = logging.DEBUG, 

86 text: _t.Literal[True] = True, 

87) -> str: ... 

88@_t.overload 

89def exec( 

90 *args: str | pathlib.Path, 

91 cwd: str | pathlib.Path | None = None, 

92 env: dict[str, str] | None = None, 

93 input: bytes | None = None, 

94 capture_io: _t.Literal[True] = True, 

95 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

96 level: int = logging.DEBUG, 

97 text: _t.Literal[False], 

98) -> bytes: ... 

99@_t.overload 

100def exec( 

101 *args: str | pathlib.Path, 

102 cwd: str | pathlib.Path | None = None, 

103 env: dict[str, str] | None = None, 

104 input: str | None = None, 

105 capture_io: _t.Literal[False], 

106 text: _t.Literal[True] = True, 

107) -> None: ... 

108@_t.overload 

109def exec( 

110 *args: str | pathlib.Path, 

111 cwd: str | pathlib.Path | None = None, 

112 env: dict[str, str] | None = None, 

113 input: bytes | None = None, 

114 capture_io: _t.Literal[False], 

115 text: _t.Literal[False], 

116) -> None: ... 

117@_t.overload 

118def exec( 

119 *args: str | pathlib.Path, 

120 cwd: None | str | pathlib.Path = None, 

121 env: dict[str, str] | None = None, 

122 capture_io: bool = True, 

123 input: None | str | bytes = None, 

124 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

125 level: int | None = None, 

126 text: bool = False, 

127) -> str | bytes | None: ... 

128def exec( 

129 *args: str | pathlib.Path, 

130 cwd: None | str | pathlib.Path = None, 

131 env: dict[str, str] | None = None, 

132 capture_io: bool = True, 

133 input: None | str | bytes = None, 

134 logger: logging.Logger | logging.LoggerAdapter[_t.Any] | str | None = None, 

135 level: int | None = None, 

136 text: bool = True, 

137) -> str | bytes | None: 

138 """ 

139 Run an executable and return its stdout. 

140 

141 Command's stderr is interactively printed to the log. 

142 

143 :param args: 

144 command arguments. 

145 :param cwd: 

146 set the current directory before the command is executed. 

147 :param env: 

148 define the environment variables for the command. 

149 :param input: 

150 string with command's stdin. If ``text`` is set to :data:`False`, this should 

151 be :class:`bytes`, otherwise it should be a :class:`str`. 

152 :param capture_io: 

153 if set to :data:`False`, process' stdout and stderr are not captured; 

154 ``logger`` and ``level`` arguments can't be given in this case, and this 

155 function returns :data:`None` instead of process' output. 

156 :param logger: 

157 logger that will be used for logging command's output. Default is to log 

158 to ``yuio.exec``. 

159 :param level: 

160 logging level for stderr outputs. Default is :data:`logging.DEBUG`. 

161 :param text: 

162 if set to :data:`False`, stdout is returned as :class:`bytes`. 

163 :returns: 

164 string (or bytes) with command's stdout, or :data:`None` if ``capture_io`` 

165 is :data:`False`. 

166 :raises: 

167 If the command fails, a :class:`~subprocess.CalledProcessError` is raised. 

168 If command can't be started, raises :class:`OSError`. 

169 

170 """ 

171 

172 if not capture_io: 

173 for name, param in [ 

174 ("logger", logger), 

175 ("level", level), 

176 ]: 

177 if param is not None: 

178 raise ValueError(f"{name} can't be specified when capture_io is False") 

179 

180 level = level if level is not None else logging.DEBUG 

181 

182 if logger is None: 

183 logger = _logger 

184 elif isinstance(logger, str): 

185 logger = logging.getLogger(logger) 

186 

187 logger.log(level, " ".join(map(str, args))) 

188 

189 with contextlib.ExitStack() as s: 

190 if not capture_io: 

191 import yuio.io 

192 

193 s.enter_context(yuio.io.SuspendOutput()) 

194 

195 process = s.enter_context( 

196 subprocess.Popen( 

197 args, 

198 cwd=cwd, 

199 env=env, 

200 stdout=subprocess.PIPE if capture_io else None, 

201 stderr=subprocess.PIPE if capture_io else None, 

202 text=text, 

203 stdin=( 

204 (subprocess.DEVNULL if input is None else subprocess.PIPE) 

205 if capture_io 

206 else None 

207 ), 

208 ) 

209 ) 

210 

211 stdout, stderr = _process_io(process, capture_io, input, logger, level) 

212 

213 process.wait() 

214 

215 if not capture_io: 

216 stdout_str = None 

217 elif text: 

218 stdout_str = "".join(_t.cast(list[str], stdout)) 

219 else: 

220 stdout_str = b"".join(_t.cast(list[bytes], stdout)) 

221 

222 if process.returncode != 0: 

223 if not capture_io: 

224 stderr_str = None 

225 elif text: 

226 stderr_str = "".join(_t.cast(list[str], stderr)) 

227 else: 

228 stderr_str = b"".join(_t.cast(list[bytes], stderr)) 

229 

230 raise ExecError( 

231 process.returncode, args, output=stdout_str, stderr=stderr_str 

232 ) 

233 

234 return stdout_str 

235 

236 

237def _process_io( 

238 process: subprocess.Popen[_t.Any], 

239 capture_io: bool, 

240 input: str | bytes | None, 

241 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

242 level: int, 

243): 

244 if not capture_io: 

245 return _process_io_nocap(process, input, logger, level) 

246 elif os.name == "nt": 

247 return _process_io_threads(process, input, logger, level) 

248 else: 

249 return _process_io_selectors(process, input, logger, level) 

250 

251 

252def _process_io_threads( 

253 process: subprocess.Popen[_t.Any], 

254 input: str | bytes | None, 

255 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

256 level: int, 

257): 

258 stdout = [] 

259 stderr = [] 

260 

261 def read_stderr(fh: _t.IO[_t.Any]): 

262 last_line = "" 

263 while True: 

264 text = fh.read() 

265 if not text: 

266 fh.close() 

267 return 

268 stderr.append(text) 

269 if isinstance(text, bytes): 

270 text = text.decode(errors="replace") 

271 for line in text.splitlines(keepends=True): 

272 if not line.endswith("\n"): 

273 last_line += line 

274 else: 

275 logger.log(level, "-> %s", last_line + line.rstrip("\r\n")) 

276 last_line = "" 

277 

278 def read_stdout(fh: _t.IO[_t.Any]): 

279 while True: 

280 text = fh.read() 

281 if not text: 

282 fh.close() 

283 return 

284 stdout.append(text) 

285 

286 assert process.stdout 

287 stdout_thread = threading.Thread( 

288 target=read_stdout, 

289 args=(process.stdout,), 

290 name="yuio stdout handler for sub-process", 

291 ) 

292 stdout_thread.daemon = True 

293 stdout_thread.start() 

294 

295 assert process.stderr 

296 stderr_thread = threading.Thread( 

297 target=read_stderr, 

298 args=(process.stderr,), 

299 name="yuio stderr handler for sub-process", 

300 ) 

301 stderr_thread.daemon = True 

302 stderr_thread.start() 

303 

304 if input is not None: 

305 assert process.stdin is not None 

306 try: 

307 process.stdin.write(input) 

308 process.stdin.flush() 

309 except BrokenPipeError: 

310 pass 

311 process.stdin.close() 

312 

313 stdout_thread.join() 

314 stderr_thread.join() 

315 

316 return stdout, stderr 

317 

318 

319# From subprocess implementation: "poll/select have the advantage of not requiring 

320# any extra file descriptor, contrarily to epoll/kqueue (also, they require a single 

321# syscall)." 

322if hasattr(selectors, "PollSelector"): 

323 _Selector = selectors.PollSelector 

324else: 

325 _Selector = selectors.SelectSelector 

326 

327 

328def _process_io_selectors( 

329 process: subprocess.Popen[_t.Any], 

330 input: str | bytes | None, 

331 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

332 level: int, 

333): 

334 assert process.stdout 

335 assert process.stderr 

336 if input is not None: 

337 assert process.stdin 

338 if process.text_mode: # type: ignore 

339 input = input.encode(process.stdin.encoding, process.stdin.errors) # type: ignore 

340 input_data = memoryview(input) # type: ignore 

341 else: 

342 input_data = None 

343 

344 stdout: list[bytes] = [] 

345 stderr: list[bytes] = [] 

346 

347 last_line = "" 

348 

349 def read_stderr(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

350 nonlocal last_line 

351 text = os.read(fd, 32 * 1024) 

352 if not text: 

353 selector.unregister(fd) 

354 fh.close() 

355 return 

356 stderr.append(text) 

357 if isinstance(text, bytes): 

358 text = text.decode(errors="replace") 

359 for line in text.splitlines(keepends=True): 

360 if not line.endswith("\n"): 

361 last_line += line 

362 else: 

363 logger.log(level, "-> %s", last_line + line.rstrip("\r\n")) 

364 last_line = "" 

365 

366 def read_stdout(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

367 text = os.read(fd, 32 * 1024) 

368 if not text: 

369 selector.unregister(fd) 

370 fh.close() 

371 return 

372 stdout.append(text) 

373 

374 index = 0 

375 

376 def write_stdin(selector: selectors.BaseSelector, fd: int, fh: _t.IO[_t.Any]): 

377 nonlocal index 

378 assert input_data is not None 

379 try: 

380 index += os.write(fd, input_data[index : index + select.PIPE_BUF]) 

381 except BrokenPipeError: 

382 selector.unregister(fd) 

383 fh.close() 

384 return 

385 if index >= len(input_data): 

386 selector.unregister(fd) 

387 fh.close() 

388 

389 with _Selector() as selector: 

390 selector.register(process.stderr, selectors.EVENT_READ, read_stderr) 

391 selector.register(process.stdout, selectors.EVENT_READ, read_stdout) 

392 if process.stdin is not None: 

393 selector.register(process.stdin, selectors.EVENT_WRITE, write_stdin) 

394 try: 

395 process.stdin.flush() 

396 except BrokenPipeError: 

397 pass 

398 

399 while selector.get_map(): 

400 for key, _ in selector.select(): 

401 key.data(selector, key.fd, key.fileobj) 

402 

403 if process.stdin is not None: 

404 assert process.stdin.closed 

405 

406 return _decode(process, stdout, process.stdout), _decode( 

407 process, stderr, process.stderr 

408 ) 

409 

410 

411def _decode( 

412 process: subprocess.Popen[_t.Any], lines: list[bytes], stream: _t.IO[_t.Any] 

413): 

414 if not process.text_mode: # type: ignore 

415 return lines 

416 raw = b"".join(lines) 

417 text = raw.decode(stream.encoding, stream.errors) # type: ignore 

418 return [text.replace("\r\n", "\n").replace("\r", "\n")] 

419 

420 

421def _process_io_nocap( 

422 process: subprocess.Popen[_t.Any], 

423 input: str | bytes | None, 

424 logger: logging.Logger | logging.LoggerAdapter[_t.Any], 

425 level: int, 

426): 

427 if input is not None: 

428 assert process.stdin is not None 

429 try: 

430 process.stdin.write(input) 

431 process.stdin.flush() 

432 except BrokenPipeError: 

433 pass 

434 process.stdin.close() 

435 

436 return None, None