in occant_baselines/rl/occant_exp_trainer.py [0:0]
def _create_mapper_rollouts(self, ppo_cfg, ans_cfg):
M = ans_cfg.overall_map_size
V = ans_cfg.MAPPER.map_size
s = ans_cfg.MAPPER.map_scale
imH, imW = ans_cfg.image_scale_hw
mapper_observation_space = {
"rgb_at_t_1": spaces.Box(
low=0.0, high=255.0, shape=(imH, imW, 3), dtype=np.float32
),
"depth_at_t_1": spaces.Box(
low=0.0, high=255.0, shape=(imH, imW, 1), dtype=np.float32
),
"ego_map_gt_at_t_1": spaces.Box(
low=0.0, high=1.0, shape=(V, V, 2), dtype=np.float32
),
"pose_at_t_1": spaces.Box(
low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
),
"pose_gt_at_t_1": spaces.Box(
low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
),
"pose_hat_at_t_1": spaces.Box(
low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
),
"rgb_at_t": spaces.Box(
low=0.0, high=255.0, shape=(imH, imW, 3), dtype=np.float32
),
"depth_at_t": spaces.Box(
low=0.0, high=255.0, shape=(imH, imW, 1), dtype=np.float32
),
"ego_map_gt_at_t": spaces.Box(
low=0.0, high=1.0, shape=(V, V, 2), dtype=np.float32
),
"pose_at_t": spaces.Box(
low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
),
"pose_gt_at_t": spaces.Box(
low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
),
"ego_map_gt_anticipated_at_t": self.envs.observation_spaces[0].spaces[
"ego_map_gt_anticipated"
],
"action_at_t_1": spaces.Box(low=0, high=4, shape=(1,), dtype=np.int32),
}
if ans_cfg.OCCUPANCY_ANTICIPATOR.type == "occant_ground_truth":
mapper_observation_space[
"ego_map_gt_anticipated_at_t_1"
] = self.envs.observation_spaces[0].spaces["ego_map_gt_anticipated"]
mapper_observation_space = spaces.Dict(mapper_observation_space)
# Multiprocessing manager
mapper_manager = mp.Manager()
mapper_device = self.device
if ans_cfg.MAPPER.use_data_parallel and len(ans_cfg.MAPPER.gpu_ids) > 0:
mapper_device = ans_cfg.MAPPER.gpu_ids[0]
mapper_rollouts = MapLargeRolloutStorageMP(
ans_cfg.MAPPER.replay_size,
mapper_observation_space,
mapper_device,
mapper_manager,
)
return mapper_rollouts