def _start_processes()

in labgraph/runners/parallel_runner.py [0:0]


    def _start_processes(self) -> None:
        processes = []
        for module in self._modules:
            assert isinstance(module, Module)
            python_module = self._get_class_module(module.__class__)
            module_class_name = module.__class__.__name__
            get_module_class(python_module, module_class_name)  # Validate class

            process_args = ["--module", f"{python_module}.{module_class_name}"]

            # Write config and state to disk for subprocess to use
            if module._config is not None:
                with tempfile.NamedTemporaryFile("wb", delete=False) as config_file:
                    pickle.dump(
                        (
                            self._get_class_qualname(module._config.__class__),
                            module._config.asdict(),
                        ),
                        config_file,
                    )
                self._temp_files.append(config_file.name)
                process_args += ["--config-file-path", config_file.name]

            if module.state is not None:
                with tempfile.NamedTemporaryFile("wb", delete=False) as state_file:
                    pickle.dump(
                        (
                            self._get_class_qualname(module.state.__class__),
                            dataclasses.asdict(module.state),
                        ),
                        state_file,
                    )
                self._temp_files.append(state_file.name)
                process_args += ["--state-file-path", state_file.name]

            # Write stream information to disk
            streams_by_topic_path = {}
            for stream in self._graph.__streams__.values():
                for topic_path in stream.topic_paths:
                    streams_by_topic_path[topic_path] = stream.id
            with tempfile.NamedTemporaryFile("wb", delete=False) as streams_file:
                pickle.dump(streams_by_topic_path, streams_file)
                self._temp_files.append(streams_file.name)
                process_args += ["--streams-file-path", streams_file.name]

            # Write runner options to disk
            with tempfile.NamedTemporaryFile("wb", delete=False) as options_file:
                pickle.dump(self._options, options_file)
                self._temp_files.append(options_file.name)
                process_args += ["--options-file-path", options_file.name]

            if module is self._graph:
                module_path = ""
            elif isinstance(module, Logger):
                module_path = LOGGER_KEY
            else:
                module_path = self._graph._get_module_path(module)
                process_args += ["--stream-namespace", module_path]
            processes.append(
                ProcessInfo(
                    name=module_path or module.__class__.__name__,
                    module=__name__.replace("parallel_runner", "entry"),
                    args=tuple(process_args),
                )
            )

        self._process_manager = ProcessManager(processes=processes)
        self._process_manager.run()