def main()

in occant_baselines/generate_topdown_maps/generate_occant_gt_maps.py [0:0]


def main(args):

    config = get_config()

    mapper_config = config.RL.ANS.MAPPER
    mapper_config.defrost()
    mapper_config.map_size = 65
    mapper_config.map_scale = 0.05
    mapper_config.freeze()

    mapper = Mapper(mapper_config, None)

    M = args.global_map_size

    config_path = args.config_path
    save_dir = args.save_dir
    safe_mkdir(save_dir)

    seen_map_save_root = os.path.join(save_dir, "seen_area_maps")
    wall_map_save_root = os.path.join(save_dir, "wall_maps")
    json_save_path = os.path.join(save_dir, "all_maps_info.json")

    config = habitat_extensions.get_extended_config(config_path)

    scenes_list = glob.glob(f"")
    dataset_path = config.DATASET.DATA_PATH.replace("{split}", config.DATASET.SPLIT)
    with gzip.open(dataset_path, "rt") as fp:
        dataset = json.load(fp)

    num_episodes = len(dataset["episodes"])

    print("===============> Loading data per scene")
    scene_to_data = {}
    if num_episodes == 0:
        content_path = os.path.join(
            dataset_path[: -len(f"{config.DATASET.SPLIT}.json.gz")], "content"
        )
        scene_paths = glob.glob(f"{content_path}/*")
        print(f"Number of scenes found: {len(scene_paths)}")
        for scene_data_path in scene_paths:
            with gzip.open(scene_data_path, "rt") as fp:
                scene_data = json.load(fp)
            num_episodes += len(scene_data["episodes"])
            scene_id = scene_data["episodes"][0]["scene_id"].split("/")[-1]
            scene_to_data[scene_id] = scene_data["episodes"]
    else:
        for ep in dataset["episodes"]:
            scene_id = ep["scene_id"].split("/")[-1]
            if scene_id not in scene_to_data:
                scene_to_data[scene_id] = []
            scene_to_data[scene_id].append(ep)

    print("===============> Computing heights for different floors in each scene")
    scenes_to_floor_heights = {}
    for scene_id, scene_data in scene_to_data.items():
        # Identify the number of unique floors in this scene
        floor_heights = []
        for ep in scene_data:
            height = ep["start_position"][1]
            if len(floor_heights) == 0:
                floor_heights.append(height)
            # Measure height difference from all existing floors
            d2floors = map(lambda x: abs(x - height), floor_heights)
            d2floors = np.array(list(d2floors))
            if not np.any(d2floors < 0.5):
                floor_heights.append(height)
        # Store this in the dict
        scenes_to_floor_heights[scene_id] = floor_heights

    env = DummyRLEnv(config=config)
    env.seed(1234)
    device = torch.device("cuda:0")

    safe_mkdir(seen_map_save_root)
    safe_mkdir(wall_map_save_root)

    # Data format for saving top-down maps per scene:
    # For each split, create a json file that contains the following dictionary:
    # key - scene_id
    # value - [{'floor_height': ...,
    #           'seen_map_path': ...,
    #           'wall_map_path': ...,
    #           'world_position': ...,
    #           'world_heading': ...},
    #          .,
    #          .,
    #          .,
    #         ]
    # The floor_height specifies a single height value on that floor.
    # All other heights within 0.5m of this height will correspond to this floor.
    # The *_map_path specifies the path to a .npy file that contains the
    # corresponding map. This map is in the world coordinate system, not episode
    # centric start-view coordinate system.
    # The world_position is the (X, Y, Z) position of the agent w.r.t. which this
    # map was computed. The world_heading is the clockwise rotation (-Z to X)
    # of the agent in the world coordinates.
    # The .npy files will be stored in seen_map_save_root and wall_map_save_root.

    # Create top-down maps per scene, per floor
    per_scene_per_floor_maps = {}
    for i in tqdm.tqdm(range(num_episodes)):

        _ = env.reset()

        episode_id = env.habitat_env.current_episode.episode_id
        scene_id = env.habitat_env.current_episode.scene_id.split("/")[-1]
        agent_state = env.habitat_env.sim.get_agent_state()
        start_position = np.array(agent_state.position)
        # Clockwise rotation
        start_heading = compute_heading_from_quaternion(agent_state.rotation)
        start_height = start_position[1].item()
        floor_heights = scenes_to_floor_heights[scene_id]
        d2floors = map(lambda x: abs(x - start_height), floor_heights)
        d2floors = np.array(list(d2floors))
        floor_idx = np.where(d2floors < 0.5)[0][0].item()

        if scene_id not in per_scene_per_floor_maps:
            per_scene_per_floor_maps[scene_id] = {}

        # If the maps for this floor were already computed, skip the episode
        if floor_idx in per_scene_per_floor_maps[scene_id]:
            continue

        global_seen_map, global_wall_map = get_episode_map(
            env, mapper, M, config, device
        )
        seen_map_save_path = f"{seen_map_save_root}/{scene_id}_{floor_idx}.npy"
        wall_map_save_path = f"{wall_map_save_root}/{scene_id}_{floor_idx}.npy"
        np.save(seen_map_save_path, global_seen_map)
        np.save(wall_map_save_path, global_wall_map)
        save_dict = {
            "floor_height": start_height,
            "seen_map_path": seen_map_save_path,
            "wall_map_path": wall_map_save_path,
            "world_position": start_position.tolist(),
            "world_heading": start_heading,
        }
        per_scene_per_floor_maps[scene_id][floor_idx] = save_dict

    save_json = {}
    for scene in per_scene_per_floor_maps.keys():
        scene_save_data = []
        for floor_idx, floor_data in per_scene_per_floor_maps[scene].items():
            scene_save_data.append(floor_data)
        save_json[scene] = scene_save_data

    json.dump(save_json, open(json_save_path, "w"))