in pretrain.py [0:0]
def main(cfg: DictConfig):
log.info(f'** running from source tree at {hydra.utils.get_original_cwd()}')
log.info(f'** running at {os.getcwd()}')
log.info(f'** configuration:\n{OmegaConf.to_yaml(cfg, resolve=True)}')
procs = []
rdvu_file = f'{cfg.distributed.rdvu_path}/rdvu-{uuid.uuid4()}'
na = int(cfg.distributed.num_actors)
nl = int(cfg.distributed.num_learners)
cfg.distributed = {
'num_actors': na,
'num_learners': nl,
'size': na + nl,
'init_method': f'file://{rdvu_file}',
'role': None,
}
if cfg.agent.batch_size % nl != 0:
raise ValueError('Batch size must be multiple of num_learners')
if cfg.agent.samples_per_update % nl != 0:
raise ValueError('Samples per update must be multiple of num_learners')
if cfg.agent.warmup_samples % nl != 0:
raise ValueError('Warmup samples must be multiple of num_learners')
if cfg.env.train_procs % nl != 0:
raise ValueError('Train procs should be multiple of num_learners')
queues = [mp.Queue() for _ in range(nl)]
bcast_barrier = mp.Barrier(na + nl)
rank = 0
for _ in range(nl):
p = mp.Process(
target=worker, args=(rank, 'learner', queues, bcast_barrier, cfg)
)
procs.append(p)
rank += 1
for _ in range(na):
p = mp.Process(
target=worker, args=(rank, 'actor', queues, bcast_barrier, cfg)
)
procs.append(p)
rank += 1
for p in procs:
p.start()
for p in procs:
p.join()
try:
os.remove(rdvu_file)
except:
pass