in pretrain.py [0:0]
def worker(rank, role, queues, bcast_barrier, cfg: DictConfig):
if th.cuda.is_available():
th.cuda.set_device(rank)
log.info(
f'Creating process group of size {cfg.distributed.size} via {cfg.distributed.init_method} [rank={rank}]'
)
dist.init_process_group(
backend='nccl' if th.cuda.is_available() else 'gloo',
rank=rank,
world_size=cfg.distributed.size,
init_method=cfg.distributed.init_method,
)
cfg.distributed.role = role
if role == 'learner':
OmegaConf.set_struct(cfg.env, False)
cfg.env.args.fork = False
cfg.env.eval_procs = 1
cfg.env.train_procs //= cfg.distributed.num_learners
cfg.agent.batch_size //= cfg.distributed.num_learners
cfg.agent.samples_per_update //= cfg.distributed.num_learners
cfg.agent.warmup_samples //= cfg.distributed.num_learners
try:
setup = setup_training_mfdim(cfg)
except:
log.exception('Error in training loop')
raise
setup.queues = queues
agent = setup.agent
agent.bcast_barrier = bcast_barrier
bcast_barrier.wait()
if cfg.distributed.num_learners > 1:
learner_group = dist.new_group(
[i for i in range(cfg.distributed.num_learners)]
)
agent.learner_group = learner_group
cp_path = cfg.checkpoint_path
if cfg.init_model_from:
log.info(f'Initializing model from checkpoint {cfg.init_model_from}')
with open(cfg.init_model_from, 'rb') as fd:
data = th.load(fd)
setup.model.load_state_dict(data['_model'])
agent._log_alpha.clear()
for k, v in data['_log_alpha'].items():
agent._log_alpha[k] = v
restore(setup)
log.debug(f'broadcast params {rank}:{role}')
bcast_barrier.wait()
for p in setup.model.parameters():
dist.broadcast(p, src=cfg.distributed.num_learners)
dist.barrier()
log.debug('done')
setup.eval_fn = eval_mfdim
agent.role = role
try:
if role == 'actor':
hucc.set_checkpoint_fn(checkpoint, setup)
train_loop_mfdim_actor(setup)
else:
log.debug(f'start leaner with queue {rank}')
train_loop_mfdim_learner(setup, setup.queues[rank])
except:
log.exception('Error in training loop')
raise
setup.close()