in src/sagemaker_training/smdataparallel.py [0:0]
def run(self, wait=True, capture_error=False):
"""Run the process.
Args:
wait (bool): A boolean indicating whether to wait and check for errors.
Defaults to True.
capture_error (bool): A boolean indicating whether to direct stderr to a stream
that can later be read. Defaults to False.
Returns:
process (subprocess.Popen): The spawned process.
"""
self._setup()
cmd = self._create_command()
cmd.extend(super(SMDataParallelRunner, self)._create_command())
logging_config.log_script_invocation(cmd, self._env_vars)
exception_classes = []
exception_classes += process.get_debugger_exception_classes()
exception_classes += get_dataparallel_exception_classes()
# remove potential duplication
exception_classes = list(set(exception_classes))
if wait:
process_spawned = process.check_error(
cmd,
exception_classes,
self._processes_per_host,
capture_error=capture_error,
cwd=environment.code_dir,
)
else:
process_spawned = process.create(
cmd,
exception_classes,
self._processes_per_host,
capture_error=capture_error,
cwd=environment.code_dir,
)
logger.info("Begin writing status file from leader node to worker nodes")
# Write status file to all nodes
status_file = MPI_FINISHED_STATUS_FILE + "." + self._master_hostname
for host in self._hosts:
if host != self._master_hostname:
status = _write_status_file(host, status_file)
retry_count = 5 if not status else 0
while not status:
if retry_count == 0:
break
logger.info(f"Retry creating status file onto {host}")
retry_count -= 1
time.sleep(1)
status = _write_status_file(host, status_file)
if not status:
logger.info(f"Failed to create status file onto {host}")
time.sleep(30)
logger.info("Finished writing status file from leader node to worker nodes")
self._tear_down()
return process_spawned