|
6 | 6 |
|
7 | 7 | import asyncio |
8 | 8 | import contextlib |
| 9 | +import os |
| 10 | +import pty |
| 11 | +import select |
9 | 12 | import shlex |
10 | 13 | import signal |
11 | 14 | import subprocess |
| 15 | +import sys |
| 16 | +import termios |
12 | 17 | import time |
| 18 | +import tty |
13 | 19 | from collections.abc import Hashable, Iterable |
14 | 20 | from dataclasses import dataclass |
15 | 21 | from typing import TextIO |
@@ -43,6 +49,8 @@ class LocalRuntimeBackend(RuntimeBackend): |
43 | 49 | DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds |
44 | 50 | DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds |
45 | 51 |
|
| 52 | + INTERACTIVE_TEE_READ_SIZE = 1024 # Read 1024 bytes to balance efficiency with responsiveness |
| 53 | + |
46 | 54 | def __init__( |
47 | 55 | self, |
48 | 56 | *, |
@@ -143,22 +151,53 @@ def _launch_interactive_job( |
143 | 151 |
|
144 | 152 | if log_file is not None: |
145 | 153 | try: |
146 | | - proc = subprocess.Popen( |
147 | | - shlex.split(job.cmd), |
148 | | - # Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe. |
149 | | - stdin=None, |
150 | | - stdout=subprocess.PIPE, |
151 | | - stderr=subprocess.STDOUT, |
152 | | - universal_newlines=True, |
153 | | - env=env, |
154 | | - ) |
155 | | - if proc.stdout is not None: |
156 | | - for line in proc.stdout: |
157 | | - print(line, end="") # noqa: T201 |
158 | | - log_file.write(line) |
159 | | - log_file.flush() |
160 | | - |
161 | | - exit_code = proc.wait() |
| 154 | + # Expose a pseudo-terminal to the subprocess, as libc checks whether stdout is a |
| 155 | + # TTY when a process writes to stdout to determine line buffering behaviour. |
| 156 | + ptm, pts = pty.openpty() |
| 157 | + |
| 158 | + # We want the terminal to operate in raw mode to disable default line discipline. |
| 159 | + # Save the old settings so that we can restore them afterwards. |
| 160 | + stdin_fd = sys.stdin.fileno() |
| 161 | + old_settings = termios.tcgetattr(stdin_fd) |
| 162 | + |
| 163 | + try: |
| 164 | + tty.setraw(ptm) |
| 165 | + |
| 166 | + # Launch the subprocess - mux stdout and stderr via the PTY and tee it. |
| 167 | + # stdin is transparent via sys.stdin |
| 168 | + proc = subprocess.Popen( |
| 169 | + shlex.split(job.cmd), |
| 170 | + stdin=sys.stdin, |
| 171 | + stdout=pts, |
| 172 | + stderr=pts, |
| 173 | + universal_newlines=True, |
| 174 | + env=env, |
| 175 | + ) |
| 176 | + |
| 177 | + # Close the backend pts reference so that the ptm reaches EOF properly |
| 178 | + os.close(pts) |
| 179 | + |
| 180 | + # Read out data from stdout/stderr and tee it to the log file until EOF reached. |
| 181 | + while True: |
| 182 | + chunk = None |
| 183 | + try: |
| 184 | + readable, _, _ = select.select([ptm], [], []) |
| 185 | + if ptm in readable: |
| 186 | + chunk = os.read(ptm, self.INTERACTIVE_TEE_READ_SIZE) |
| 187 | + if not chunk: |
| 188 | + break |
| 189 | + except OSError: |
| 190 | + break |
| 191 | + if chunk is not None: |
| 192 | + sys.stdout.buffer.write(chunk) |
| 193 | + sys.stdout.buffer.flush() |
| 194 | + log_file.write(chunk.decode(encoding="utf-8", errors="surrogateescape")) |
| 195 | + log_file.flush() |
| 196 | + |
| 197 | + exit_code = proc.wait() |
| 198 | + finally: |
| 199 | + # Restore old terminal settings after all pending output is written (TCSADRAIN) |
| 200 | + termios.tcsetattr(stdin_fd, termios.TCSADRAIN, old_settings) |
162 | 201 | except subprocess.SubprocessError as e: |
163 | 202 | log_file.close() |
164 | 203 | log.exception("Error launching job subprocess: %s", job.full_name) |
|
0 commit comments