in pycls/core/distributed.py [0:0]
def multi_proc_run(num_proc, fun):
"""Run a single or multi GPU job locally on the current node."""
launch = cfg.LAUNCH
if launch.MODE in ["submitit_local", "slurm"]:
# Launch fun() using submitit either locally or on SLURM
use_slurm = launch.MODE == "slurm"
executor = submitit.AutoExecutor if use_slurm else submitit.LocalExecutor
kwargs = {"slurm_max_num_timeout": launch.MAX_RETRY} if use_slurm else {}
executor = executor(folder=cfg.OUT_DIR, **kwargs)
num_gpus_per_node = min(cfg.NUM_GPUS, cfg.MAX_GPUS_PER_NODE)
executor.update_parameters(
mem_gb=launch.MEM_PER_GPU * num_gpus_per_node,
gpus_per_node=num_gpus_per_node,
tasks_per_node=num_gpus_per_node,
cpus_per_task=launch.CPUS_PER_GPU,
nodes=max(1, cfg.NUM_GPUS // cfg.MAX_GPUS_PER_NODE),
timeout_min=launch.TIME_LIMIT,
name=launch.NAME,
slurm_partition=launch.PARTITION,
slurm_comment=launch.COMMENT,
slurm_constraint=launch.GPU_TYPE,
slurm_additional_parameters={"mail-user": launch.EMAIL, "mail-type": "END"},
)
main_port = random.randint(cfg.PORT_RANGE[0], cfg.PORT_RANGE[1])
job = executor.submit(SubmititRunner(main_port, fun, cfg))
print("Submitted job_id {} with out_dir: {}".format(job.job_id, cfg.OUT_DIR))
if not use_slurm:
job.wait()
elif num_proc > 1:
main_port = random.randint(cfg.PORT_RANGE[0], cfg.PORT_RANGE[1])
mp_runner = torch.multiprocessing.start_processes
args = (fun, main_port, cfg, num_proc)
# Note: using "fork" below, "spawn" causes time and error regressions. Using
# spawn changes the default multiprocessing context to spawn, which doesn't
# interact well with the dataloaders (likely due to the use of OpenCV).
mp_runner(single_proc_run, args=args, nprocs=num_proc, start_method="fork")
else:
fun()