in occant_baselines/generate_topdown_maps/generate_environment_layouts.py [0:0]
def main(args):
config = get_config()
mapper_config = config.RL.ANS.MAPPER
mapper_config.defrost()
mapper_config.map_size = 65
mapper_config.map_scale = 0.05
mapper_config.freeze()
mapper = Mapper(mapper_config, None)
M = args.global_map_size
config_path = args.config_path
save_dir = args.save_dir
safe_mkdir(save_dir)
config = habitat_extensions.get_extended_config(config_path)
dataset_path = config.DATASET.DATA_PATH.replace("{split}", config.DATASET.SPLIT)
with gzip.open(dataset_path, "rt") as fp:
dataset = json.load(fp)
num_episodes = len(dataset["episodes"])
env = DummyRLEnv(config=config)
env.seed(1234)
device = torch.device("cuda:0")
for i in tqdm.tqdm(range(num_episodes)):
_ = env.reset()
# Initialize a global map for the episode
global_map = torch.zeros(1, 2, M, M).to(device)
grid_size = config.TASK.GT_EGO_MAP.MAP_SCALE
coordinate_max = maps.COORDINATE_MAX
coordinate_min = maps.COORDINATE_MIN
resolution = (coordinate_max - coordinate_min) / grid_size
grid_resolution = (int(resolution), int(resolution))
top_down_map = maps.get_topdown_map(env.habitat_env.sim, grid_resolution, 20000)
map_w, map_h = top_down_map.shape
intervals = (max(int(1.0 / grid_size), 1), max(int(1.0 / grid_size), 1))
x_vals = np.arange(0, map_w, intervals[0], dtype=int)
y_vals = np.arange(0, map_h, intervals[1], dtype=int)
coors = np.stack(np.meshgrid(x_vals, y_vals), axis=2) # (H, W, 2)
coors = coors.reshape(-1, 2) # (H*W, 2)
map_vals = top_down_map[coors[:, 0], coors[:, 1]]
valid_coors = coors[map_vals > 0]
real_x_vals = coordinate_max - valid_coors[:, 0] * grid_size
real_z_vals = coordinate_min + valid_coors[:, 1] * grid_size
start_y = env.habitat_env.sim.get_agent_state().position[1]
for i in range(real_x_vals.shape[0]):
for theta in np.arange(-np.pi, np.pi, np.pi / 3.0):
position = [
real_x_vals[i].item(),
start_y.item(),
real_z_vals[i].item(),
]
rotation = [
0.0,
np.sin(theta / 2).item(),
0.0,
np.cos(theta / 2).item(),
]
sim_obs = env.habitat_env.sim.get_observations_at(
position, rotation, keep_agent_at_new_pose=True
)
episode = env.habitat_env.current_episode
obs = env.habitat_env.task.sensor_suite.get_observations(
observations=sim_obs, episode=episode, task=env.habitat_env.task
)
ego_map_gt = torch.Tensor(obs["ego_map_gt"]).to(device)
ego_map_gt = rearrange(ego_map_gt, "h w c -> () c h w")
pose_gt = torch.Tensor(obs["pose_gt"]).unsqueeze(0).to(device)
global_map = mapper.ext_register_map(global_map, ego_map_gt, pose_gt)
# Save data
global_map_np = asnumpy(rearrange(global_map, "b c h w -> b h w c")[0])
episode_id = env.habitat_env.current_episode.episode_id
np.save(os.path.join(save_dir, f"episode_id_{episode_id}.npy"), global_map_np)