in optimum_benchmark/launchers/torchrun/launcher.py [0:0]
def launch(self, worker: Callable[..., BenchmarkReport], worker_args: List[Any]) -> BenchmarkReport:
parent_connection, child_connection = Pipe()
main_process_pid = os.getpid()
isolated_process = Process(
target=target,
args=(worker, worker_args, child_connection, main_process_pid, self.launch_config, self.logger),
daemon=False,
)
with ExitStack() as stack:
if self.config.numactl:
stack.enter_context(self.numactl_executable())
isolated_process.start()
if isolated_process.is_alive():
sync_with_child(parent_connection)
else:
raise RuntimeError("Could not synchronize with isolated process")
if self.config.device_isolation:
stack.enter_context(self.device_isolation(isolated_process.pid))
if isolated_process.is_alive():
sync_with_child(parent_connection)
else:
raise RuntimeError("Could not synchronize with isolated process")
while isolated_process.is_alive() and not parent_connection.poll():
pass
if not isolated_process.is_alive() and isolated_process.exitcode is not None and isolated_process.exitcode != 0:
raise RuntimeError(f"Isolated process exited with non-zero code {isolated_process.exitcode}")
if parent_connection.poll():
response = parent_connection.recv()
else:
raise RuntimeError("Isolated process did not send any response")
if isinstance(response, str) and response.startswith(tempfile.gettempdir()):
response = pickle.load(open(response, "rb"))
if isinstance(response, str):
self.logger.error("\t+ Received traceback from isolated process")
raise ChildProcessError(response)
elif isinstance(response, dict):
self.logger.info("\t+ Received outputs from isolated process")
reports = []
for rank, report_dict in response.items():
if isinstance(report_dict, str):
self.logger.error(f"\t+ Received traceback from rank process {rank}")
raise ChildProcessError(report_dict)
self.logger.info(f"\t+ Received report from rank process {rank}")
report = BenchmarkReport.from_dict(report_dict)
reports.append(report)
self.logger.info("\t+ Aggregating reports from all rank processes")
report = BenchmarkReport.aggregate_across_processes(reports)
return report