def _create_mapper_rollouts()

in occant_baselines/rl/occant_exp_trainer.py [0:0]


    def _create_mapper_rollouts(self, ppo_cfg, ans_cfg):
        M = ans_cfg.overall_map_size
        V = ans_cfg.MAPPER.map_size
        s = ans_cfg.MAPPER.map_scale
        imH, imW = ans_cfg.image_scale_hw
        mapper_observation_space = {
            "rgb_at_t_1": spaces.Box(
                low=0.0, high=255.0, shape=(imH, imW, 3), dtype=np.float32
            ),
            "depth_at_t_1": spaces.Box(
                low=0.0, high=255.0, shape=(imH, imW, 1), dtype=np.float32
            ),
            "ego_map_gt_at_t_1": spaces.Box(
                low=0.0, high=1.0, shape=(V, V, 2), dtype=np.float32
            ),
            "pose_at_t_1": spaces.Box(
                low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
            ),
            "pose_gt_at_t_1": spaces.Box(
                low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
            ),
            "pose_hat_at_t_1": spaces.Box(
                low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
            ),
            "rgb_at_t": spaces.Box(
                low=0.0, high=255.0, shape=(imH, imW, 3), dtype=np.float32
            ),
            "depth_at_t": spaces.Box(
                low=0.0, high=255.0, shape=(imH, imW, 1), dtype=np.float32
            ),
            "ego_map_gt_at_t": spaces.Box(
                low=0.0, high=1.0, shape=(V, V, 2), dtype=np.float32
            ),
            "pose_at_t": spaces.Box(
                low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
            ),
            "pose_gt_at_t": spaces.Box(
                low=-100000.0, high=100000.0, shape=(3,), dtype=np.float32
            ),
            "ego_map_gt_anticipated_at_t": self.envs.observation_spaces[0].spaces[
                "ego_map_gt_anticipated"
            ],
            "action_at_t_1": spaces.Box(low=0, high=4, shape=(1,), dtype=np.int32),
        }
        if ans_cfg.OCCUPANCY_ANTICIPATOR.type == "occant_ground_truth":
            mapper_observation_space[
                "ego_map_gt_anticipated_at_t_1"
            ] = self.envs.observation_spaces[0].spaces["ego_map_gt_anticipated"]
        mapper_observation_space = spaces.Dict(mapper_observation_space)
        # Multiprocessing manager
        mapper_manager = mp.Manager()
        mapper_device = self.device
        if ans_cfg.MAPPER.use_data_parallel and len(ans_cfg.MAPPER.gpu_ids) > 0:
            mapper_device = ans_cfg.MAPPER.gpu_ids[0]
        mapper_rollouts = MapLargeRolloutStorageMP(
            ans_cfg.MAPPER.replay_size,
            mapper_observation_space,
            mapper_device,
            mapper_manager,
        )

        return mapper_rollouts