def main()

in occant_baselines/generate_topdown_maps/generate_environment_layouts.py [0:0]


def main(args):

    config = get_config()

    mapper_config = config.RL.ANS.MAPPER
    mapper_config.defrost()
    mapper_config.map_size = 65
    mapper_config.map_scale = 0.05
    mapper_config.freeze()

    mapper = Mapper(mapper_config, None)

    M = args.global_map_size

    config_path = args.config_path
    save_dir = args.save_dir
    safe_mkdir(save_dir)

    config = habitat_extensions.get_extended_config(config_path)

    dataset_path = config.DATASET.DATA_PATH.replace("{split}", config.DATASET.SPLIT)
    with gzip.open(dataset_path, "rt") as fp:
        dataset = json.load(fp)

    num_episodes = len(dataset["episodes"])

    env = DummyRLEnv(config=config)
    env.seed(1234)
    device = torch.device("cuda:0")

    for i in tqdm.tqdm(range(num_episodes)):
        _ = env.reset()

        # Initialize a global map for the episode
        global_map = torch.zeros(1, 2, M, M).to(device)

        grid_size = config.TASK.GT_EGO_MAP.MAP_SCALE
        coordinate_max = maps.COORDINATE_MAX
        coordinate_min = maps.COORDINATE_MIN
        resolution = (coordinate_max - coordinate_min) / grid_size
        grid_resolution = (int(resolution), int(resolution))

        top_down_map = maps.get_topdown_map(env.habitat_env.sim, grid_resolution, 20000)

        map_w, map_h = top_down_map.shape

        intervals = (max(int(1.0 / grid_size), 1), max(int(1.0 / grid_size), 1))
        x_vals = np.arange(0, map_w, intervals[0], dtype=int)
        y_vals = np.arange(0, map_h, intervals[1], dtype=int)
        coors = np.stack(np.meshgrid(x_vals, y_vals), axis=2)  # (H, W, 2)
        coors = coors.reshape(-1, 2)  # (H*W, 2)
        map_vals = top_down_map[coors[:, 0], coors[:, 1]]
        valid_coors = coors[map_vals > 0]

        real_x_vals = coordinate_max - valid_coors[:, 0] * grid_size
        real_z_vals = coordinate_min + valid_coors[:, 1] * grid_size
        start_y = env.habitat_env.sim.get_agent_state().position[1]

        for i in range(real_x_vals.shape[0]):
            for theta in np.arange(-np.pi, np.pi, np.pi / 3.0):
                position = [
                    real_x_vals[i].item(),
                    start_y.item(),
                    real_z_vals[i].item(),
                ]
                rotation = [
                    0.0,
                    np.sin(theta / 2).item(),
                    0.0,
                    np.cos(theta / 2).item(),
                ]

                sim_obs = env.habitat_env.sim.get_observations_at(
                    position, rotation, keep_agent_at_new_pose=True
                )
                episode = env.habitat_env.current_episode
                obs = env.habitat_env.task.sensor_suite.get_observations(
                    observations=sim_obs, episode=episode, task=env.habitat_env.task
                )
                ego_map_gt = torch.Tensor(obs["ego_map_gt"]).to(device)
                ego_map_gt = rearrange(ego_map_gt, "h w c -> () c h w")
                pose_gt = torch.Tensor(obs["pose_gt"]).unsqueeze(0).to(device)
                global_map = mapper.ext_register_map(global_map, ego_map_gt, pose_gt)

        # Save data
        global_map_np = asnumpy(rearrange(global_map, "b c h w -> b h w c")[0])
        episode_id = env.habitat_env.current_episode.episode_id
        np.save(os.path.join(save_dir, f"episode_id_{episode_id}.npy"), global_map_np)