components/_impl/workers/subprocess_worker.py (202 lines of code) (raw):
import contextlib
import datetime
import io
import os
import marshal
import pathlib
import shutil
import signal
import subprocess
import sys
import tempfile
import textwrap
import time
import typing
import components
from components._impl.workers import base
from components._impl.workers import subprocess_rpc
class SubprocessWorker(base.WorkerBase):
"""Open a subprocess using `python -i`, and use it to execute code.
This class wraps a subprocess which runs a clean instance of Python.
This enables hermetic execution of stateful code, GIL free concurrent
benchmarking, and easy use of command line tools from Python.
When using SubprocessWorker, it is important to remember that while the
environment is (or at least tries to be) identical to the parent, it does
not share state or initialization with the parent process. Imports must be
re-run in the worker, and shared resources (such as file descriptors) will
not be available. For most applications this mirrors the semantics of
`timeit.Timer`.
The principle extension point for SubprocessWorker is the `args`
property. By overriding it, subclasses can change the nature of the
underlying subprocess while reusing all of the generic communication and
fault handling facilities of the base class. For example, suppose we want
to use TaskSet to pin the worker to a single core. The code is simply:
```
class TasksetZeroWorker(SubprocessWorker):
@property
def args(self) -> typing.List[str]:
return ["taskset", "--cpu-list", "0"] + super().args
```
"""
_working_dir: str
_alive: bool = False
_bootstrap_timeout: int = 10 # seconds
def __init__(self, timeout: typing.Optional[float] = None) -> None:
super().__init__()
# Log inputs and outputs for debugging.
self._command_log = os.path.join(self.working_dir, "commands.log")
pathlib.Path(self._command_log).touch()
self._stdout_f: io.FileIO = io.FileIO(
os.path.join(self.working_dir, "stdout.txt"), mode="w",
)
self._stderr_f: io.FileIO = io.FileIO(
os.path.join(self.working_dir, "stderr.txt"), mode="w",
)
# `self._run` has strong assumptions about how `_input_pipe` and
# `_output_pipe` are used. They should not be accessed in any other
# context. (The same is true for `self.load` and `_load_pipe`.)
self._input_pipe = subprocess_rpc.Pipe()
self._output_pipe = subprocess_rpc.Pipe(
timeout=timeout,
timeout_callback=self._kill_proc,
)
self._load_pipe = subprocess_rpc.Pipe(
timeout=timeout,
timeout_callback=self._kill_proc,
)
# Windows and Unix differ in how pipes are shared with children.
# In Unix they are inherited, while in Windows the child consults the
# OS to get access. Most of this complexity is handled by
# `subprocess_rpc.Pipe`, however we also have to make sure Popen
# exposes the pipes in a platform appropriate way.
child_fds = [
self._input_pipe.read_fd,
self._output_pipe.write_fd,
self._load_pipe.write_fd,
]
if subprocess_rpc.IS_WINDOWS:
for fd in child_fds:
os.set_inheritable(fd, True)
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList["handle_list"].extend(
[subprocess_rpc.to_handle(fd) for fd in child_fds])
popen_kwargs = {
"startupinfo": startupinfo,
}
else:
popen_kwargs = {
"close_fds": True,
"pass_fds": child_fds,
}
self._proc = subprocess.Popen(
args=self.args,
stdin=subprocess.PIPE,
stdout=self._stdout_f,
stderr=self._stderr_f,
encoding=subprocess_rpc.ENCODING,
bufsize=1,
cwd=os.getcwd(),
**popen_kwargs,
)
# setup the pid of child process in the output pipe
self._output_pipe.set_writer_pid(self._proc.pid)
self._worker_bootstrap_finished: bool = False
self._bootstrap_worker()
self._alive = True
@property
def working_dir(self) -> str:
# A subclass might need to access `self.working_dir` before calling
# `super().__init__` in order to properly construct `args`, so we need
# to lazily initialize it.
if getattr(self, "_working_dir", None) is None:
self._working_dir = tempfile.mkdtemp()
return self._working_dir
@property
def args(self) -> typing.List[str]:
return [sys.executable, "-i", "-u"]
def run(self, snippet: str) -> None:
self._run(snippet)
def store(self, name: str, value: typing.Any, in_memory: bool = False) -> None:
if in_memory:
raise NotImplementedError("SubprocessWorker does not support `in_memory`")
# NB: we convert the bytes to a hex string to avoid encoding issues.
self._run(f"""
{name} = {subprocess_rpc.WORKER_IMPL_NAMESPACE}["marshal"].loads(
bytes.fromhex({repr(marshal.dumps(value).hex())})
)
""")
def load(self, name: str) -> typing.Any:
self._run(f"""
{subprocess_rpc.WORKER_IMPL_NAMESPACE}["load_pipe"].write(
{subprocess_rpc.WORKER_IMPL_NAMESPACE}["marshal"].dumps({name})
)
""")
return marshal.loads(self._load_pipe.read())
@property
def in_process(self) -> bool:
return False
@property
def alive(self) -> bool:
return self._alive and self._proc.poll() is None
def _bootstrap_worker(self) -> None:
"""Import subprocess_rpc in the worker, and start the work loop.
Commands are executed by writing to `self._input_pipe`, and waiting for
a response on `self._output_pipe`. This presumes, however, that there
is a worker doing the opposite: listening to the input pipe and writing
to the output pipe. At startup `self._proc` is a simple interactive
Python process, so we have to bootstrap it to start the work loop or
else `self._run` will hang waiting for jobs to be processed.
"""
# NB: This gets sent directly to `self._proc`'s stdin, so it MUST be
# a single expression and may NOT contain any empty lines. (Due to
# how Python processes commands.)
bootstrap_command = textwrap.dedent(f"""
try:
import marshal
import sys
sys_path_old = list(sys.path)
sys.path = marshal.loads(
bytes.fromhex({repr(marshal.dumps(sys.path).hex())})
)
# The parent gets priority, but a subclass could set PYTHONPATH
# so we have to respect extra paths.
sys.path.extend([i for i in sys_path_old if i and i not in sys.path])
from components._impl.workers import subprocess_rpc
output_pipe = subprocess_rpc.Pipe(
write_handle={self._output_pipe.write_handle})
output_pipe.write(subprocess_rpc.BOOTSTRAP_IMPORT_SUCCESS)
subprocess_rpc.run_loop(
input_handle={self._input_pipe.read_handle},
output_pipe=output_pipe,
load_handle={self._load_pipe.write_handle},
)
except:
sys.exit(1)
""").strip()
if self._proc.poll() is not None:
raise ValueError("Process has already exited.")
proc_stdin = self._proc.stdin
assert proc_stdin is not None
self._log_cmd(bootstrap_command)
# We need two newlines for Python to stop waiting for more input.
proc_stdin.write(f"{bootstrap_command}\n\n")
proc_stdin.flush()
with self.watch_stdout_stderr() as get_output:
try:
# Bootstrapping is very fast. (Unlike user code where we have
# no a priori expected upper bound.) If we don't get a response
# prior to the timeout, it is overwhelmingly likely that the
# worker died or the bootstrap failed. (E.g. failed to resolve
# import path.) This simply allows us to raise a good error.
bootstrap_pipe = subprocess_rpc.Pipe(
read_handle=self._output_pipe.read_handle,
write_handle=self._output_pipe.write_handle,
timeout=self._bootstrap_timeout,
)
result = bootstrap_pipe.read()
assert result == subprocess_rpc.BOOTSTRAP_IMPORT_SUCCESS, result
result = bootstrap_pipe.read()
assert result == subprocess_rpc.BOOTSTRAP_INPUT_LOOP_SUCCESS, result
self._worker_bootstrap_finished = True
assert self._proc.poll() is None
except (Exception, KeyboardInterrupt) as e:
stdout, stderr = get_output()
cause = "import failed" if self._proc.poll() else "timeout"
raise e from RuntimeError(
f"Failed to bootstrap worker ({cause}):\n"
f" working_dir: {self.working_dir}\n"
f" stdout:\n{textwrap.indent(stdout, ' ' * 8)}\n\n"
f" stderr:\n{textwrap.indent(stderr, ' ' * 8)}"
)
def _log_cmd(self, snippet: str) -> None:
with open(self._command_log, "at", encoding="utf-8") as f:
now = datetime.datetime.now().strftime("[%Y-%m-%d] %H:%M:%S.%f")
f.write(f"# {now}\n{snippet}\n\n")
@contextlib.contextmanager
def watch_stdout_stderr(self):
# Get initial state for stdout and stderr, since we only want to
# capture output since the contextmanager started.
stdout_stat = os.stat(self._stdout_f.name)
stderr_stat = os.stat(self._stderr_f.name)
def get() -> typing.Tuple[str, str]:
with open(self._stdout_f.name, "rb") as f:
_ = f.seek(stdout_stat.st_size)
stdout = f.read().decode("utf-8").strip()
with open(self._stderr_f.name, "rb") as f:
_ = f.seek(stderr_stat.st_size)
stderr = f.read().decode("utf-8").strip()
return stdout, stderr
yield get
def _run(self, snippet: str) -> None:
"""Helper method for running code in a subprocess."""
assert self._worker_bootstrap_finished
assert self.alive, "Process has exited"
snippet = textwrap.dedent(snippet)
with self.watch_stdout_stderr() as get_output:
self._input_pipe.write(snippet.encode(subprocess_rpc.ENCODING))
self._log_cmd(snippet)
result = marshal.loads(self._output_pipe.read())
if isinstance(result, str):
assert result == subprocess_rpc.SUCCESS
return
assert isinstance(result, dict)
if not result:
stdout, stderr = get_output()
raise subprocess.SubprocessError(
"Uncaught Exception in worker:"
f" working_dir: {self.working_dir}\n"
f" stdout:\n{textwrap.indent(stdout, ' ' * 8)}\n\n"
f" stderr:\n{textwrap.indent(stderr, ' ' * 8)}")
serialized_e = subprocess_rpc.SerializedException(**result)
stdout, stderr = get_output()
subprocess_rpc.SerializedException.raise_from(
serialized_e=serialized_e,
extra_context=(
f" working_dir: {self.working_dir}\n"
f" stdout:\n{textwrap.indent(stdout, ' ' * 8)}\n\n"
f" stderr:\n{textwrap.indent(stderr, ' ' * 8)}"
)
)
def _kill_proc(self) -> None:
"""Best effort to kill subprocess."""
if getattr(self, "_proc", None) is None:
# We failed in the constructor, so there's nothing to clean up.
return
self._input_pipe.write(subprocess_rpc.HARD_EXIT)
try:
self._proc.wait(timeout=1)
except subprocess.TimeoutExpired:
if not subprocess_rpc.IS_WINDOWS:
self._proc.send_signal(signal.SIGINT)
try:
self._proc.terminate()
except PermissionError:
# NoisePoliceWorker runs under sudo, and thus will not allow
# SIGTERM to be sent.
print(f"Failed to clean up process {self._proc.pid}")
# Unfortunately Popen does not clean up stdin when using PIPE. However
# we also can't unconditionally close the fd as it could interfere with
# the orderly teardown of the process. We try our best to kill
# `self._proc` in the previous block; if `self._proc` is terminated we
# make sure its stdin TextIOWrapper is closed as well.
try:
self._proc.wait(timeout=1)
proc_stdin = self._proc.stdin
if proc_stdin is not None:
proc_stdin.close()
except subprocess.TimeoutExpired:
pass
self._alive = False
def __del__(self) -> None:
self._kill_proc()
# We own these fd's, and it seems that we can unconditionally close
# them without impacting the shutdown of `self._proc`.
self._stdout_f.close()
self._stderr_f.close()
# Finally, make sure we don't leak any files.
shutil.rmtree(self._working_dir, ignore_errors=True)