def main()

in pretrain.py [0:0]


def main(cfg: DictConfig):
    log.info(f'** running from source tree at {hydra.utils.get_original_cwd()}')
    log.info(f'** running at {os.getcwd()}')
    log.info(f'** configuration:\n{OmegaConf.to_yaml(cfg, resolve=True)}')

    procs = []
    rdvu_file = f'{cfg.distributed.rdvu_path}/rdvu-{uuid.uuid4()}'
    na = int(cfg.distributed.num_actors)
    nl = int(cfg.distributed.num_learners)
    cfg.distributed = {
        'num_actors': na,
        'num_learners': nl,
        'size': na + nl,
        'init_method': f'file://{rdvu_file}',
        'role': None,
    }

    if cfg.agent.batch_size % nl != 0:
        raise ValueError('Batch size must be multiple of num_learners')
    if cfg.agent.samples_per_update % nl != 0:
        raise ValueError('Samples per update must be multiple of num_learners')
    if cfg.agent.warmup_samples % nl != 0:
        raise ValueError('Warmup samples must be multiple of num_learners')
    if cfg.env.train_procs % nl != 0:
        raise ValueError('Train procs should be multiple of num_learners')

    queues = [mp.Queue() for _ in range(nl)]
    bcast_barrier = mp.Barrier(na + nl)
    rank = 0
    for _ in range(nl):
        p = mp.Process(
            target=worker, args=(rank, 'learner', queues, bcast_barrier, cfg)
        )
        procs.append(p)
        rank += 1
    for _ in range(na):
        p = mp.Process(
            target=worker, args=(rank, 'actor', queues, bcast_barrier, cfg)
        )
        procs.append(p)
        rank += 1
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    try:
        os.remove(rdvu_file)
    except:
        pass