in occant_baselines/generate_topdown_maps/generate_occant_gt_maps.py [0:0]
def main(args):
config = get_config()
mapper_config = config.RL.ANS.MAPPER
mapper_config.defrost()
mapper_config.map_size = 65
mapper_config.map_scale = 0.05
mapper_config.freeze()
mapper = Mapper(mapper_config, None)
M = args.global_map_size
config_path = args.config_path
save_dir = args.save_dir
safe_mkdir(save_dir)
seen_map_save_root = os.path.join(save_dir, "seen_area_maps")
wall_map_save_root = os.path.join(save_dir, "wall_maps")
json_save_path = os.path.join(save_dir, "all_maps_info.json")
config = habitat_extensions.get_extended_config(config_path)
scenes_list = glob.glob(f"")
dataset_path = config.DATASET.DATA_PATH.replace("{split}", config.DATASET.SPLIT)
with gzip.open(dataset_path, "rt") as fp:
dataset = json.load(fp)
num_episodes = len(dataset["episodes"])
print("===============> Loading data per scene")
scene_to_data = {}
if num_episodes == 0:
content_path = os.path.join(
dataset_path[: -len(f"{config.DATASET.SPLIT}.json.gz")], "content"
)
scene_paths = glob.glob(f"{content_path}/*")
print(f"Number of scenes found: {len(scene_paths)}")
for scene_data_path in scene_paths:
with gzip.open(scene_data_path, "rt") as fp:
scene_data = json.load(fp)
num_episodes += len(scene_data["episodes"])
scene_id = scene_data["episodes"][0]["scene_id"].split("/")[-1]
scene_to_data[scene_id] = scene_data["episodes"]
else:
for ep in dataset["episodes"]:
scene_id = ep["scene_id"].split("/")[-1]
if scene_id not in scene_to_data:
scene_to_data[scene_id] = []
scene_to_data[scene_id].append(ep)
print("===============> Computing heights for different floors in each scene")
scenes_to_floor_heights = {}
for scene_id, scene_data in scene_to_data.items():
# Identify the number of unique floors in this scene
floor_heights = []
for ep in scene_data:
height = ep["start_position"][1]
if len(floor_heights) == 0:
floor_heights.append(height)
# Measure height difference from all existing floors
d2floors = map(lambda x: abs(x - height), floor_heights)
d2floors = np.array(list(d2floors))
if not np.any(d2floors < 0.5):
floor_heights.append(height)
# Store this in the dict
scenes_to_floor_heights[scene_id] = floor_heights
env = DummyRLEnv(config=config)
env.seed(1234)
device = torch.device("cuda:0")
safe_mkdir(seen_map_save_root)
safe_mkdir(wall_map_save_root)
# Data format for saving top-down maps per scene:
# For each split, create a json file that contains the following dictionary:
# key - scene_id
# value - [{'floor_height': ...,
# 'seen_map_path': ...,
# 'wall_map_path': ...,
# 'world_position': ...,
# 'world_heading': ...},
# .,
# .,
# .,
# ]
# The floor_height specifies a single height value on that floor.
# All other heights within 0.5m of this height will correspond to this floor.
# The *_map_path specifies the path to a .npy file that contains the
# corresponding map. This map is in the world coordinate system, not episode
# centric start-view coordinate system.
# The world_position is the (X, Y, Z) position of the agent w.r.t. which this
# map was computed. The world_heading is the clockwise rotation (-Z to X)
# of the agent in the world coordinates.
# The .npy files will be stored in seen_map_save_root and wall_map_save_root.
# Create top-down maps per scene, per floor
per_scene_per_floor_maps = {}
for i in tqdm.tqdm(range(num_episodes)):
_ = env.reset()
episode_id = env.habitat_env.current_episode.episode_id
scene_id = env.habitat_env.current_episode.scene_id.split("/")[-1]
agent_state = env.habitat_env.sim.get_agent_state()
start_position = np.array(agent_state.position)
# Clockwise rotation
start_heading = compute_heading_from_quaternion(agent_state.rotation)
start_height = start_position[1].item()
floor_heights = scenes_to_floor_heights[scene_id]
d2floors = map(lambda x: abs(x - start_height), floor_heights)
d2floors = np.array(list(d2floors))
floor_idx = np.where(d2floors < 0.5)[0][0].item()
if scene_id not in per_scene_per_floor_maps:
per_scene_per_floor_maps[scene_id] = {}
# If the maps for this floor were already computed, skip the episode
if floor_idx in per_scene_per_floor_maps[scene_id]:
continue
global_seen_map, global_wall_map = get_episode_map(
env, mapper, M, config, device
)
seen_map_save_path = f"{seen_map_save_root}/{scene_id}_{floor_idx}.npy"
wall_map_save_path = f"{wall_map_save_root}/{scene_id}_{floor_idx}.npy"
np.save(seen_map_save_path, global_seen_map)
np.save(wall_map_save_path, global_wall_map)
save_dict = {
"floor_height": start_height,
"seen_map_path": seen_map_save_path,
"wall_map_path": wall_map_save_path,
"world_position": start_position.tolist(),
"world_heading": start_heading,
}
per_scene_per_floor_maps[scene_id][floor_idx] = save_dict
save_json = {}
for scene in per_scene_per_floor_maps.keys():
scene_save_data = []
for floor_idx, floor_data in per_scene_per_floor_maps[scene].items():
scene_save_data.append(floor_data)
save_json[scene] = scene_save_data
json.dump(save_json, open(json_save_path, "w"))