in labgraph/runners/parallel_runner.py [0:0]
def _start_processes(self) -> None:
processes = []
for module in self._modules:
assert isinstance(module, Module)
python_module = self._get_class_module(module.__class__)
module_class_name = module.__class__.__name__
get_module_class(python_module, module_class_name) # Validate class
process_args = ["--module", f"{python_module}.{module_class_name}"]
# Write config and state to disk for subprocess to use
if module._config is not None:
with tempfile.NamedTemporaryFile("wb", delete=False) as config_file:
pickle.dump(
(
self._get_class_qualname(module._config.__class__),
module._config.asdict(),
),
config_file,
)
self._temp_files.append(config_file.name)
process_args += ["--config-file-path", config_file.name]
if module.state is not None:
with tempfile.NamedTemporaryFile("wb", delete=False) as state_file:
pickle.dump(
(
self._get_class_qualname(module.state.__class__),
dataclasses.asdict(module.state),
),
state_file,
)
self._temp_files.append(state_file.name)
process_args += ["--state-file-path", state_file.name]
# Write stream information to disk
streams_by_topic_path = {}
for stream in self._graph.__streams__.values():
for topic_path in stream.topic_paths:
streams_by_topic_path[topic_path] = stream.id
with tempfile.NamedTemporaryFile("wb", delete=False) as streams_file:
pickle.dump(streams_by_topic_path, streams_file)
self._temp_files.append(streams_file.name)
process_args += ["--streams-file-path", streams_file.name]
# Write runner options to disk
with tempfile.NamedTemporaryFile("wb", delete=False) as options_file:
pickle.dump(self._options, options_file)
self._temp_files.append(options_file.name)
process_args += ["--options-file-path", options_file.name]
if module is self._graph:
module_path = ""
elif isinstance(module, Logger):
module_path = LOGGER_KEY
else:
module_path = self._graph._get_module_path(module)
process_args += ["--stream-namespace", module_path]
processes.append(
ProcessInfo(
name=module_path or module.__class__.__name__,
module=__name__.replace("parallel_runner", "entry"),
args=tuple(process_args),
)
)
self._process_manager = ProcessManager(processes=processes)
self._process_manager.run()