in environments/gym-avd/gym_avd/envs/avd_nav_env.py [0:0]
def reset(self, scene_idx: Optional[int] = None):
# Spawn agent in a new scene at a random image.
if self.episodes is None:
print("Sampling random episodes!")
if scene_idx is None:
scenes = getattr(self, "{}_scenes".format(self.split))
self.scene_idx = self._rng.choice(scenes)
else:
self.scene_idx = scene_idx
else:
if self.episodes_idx is None:
self.episodes_idx = 0
else:
self.episodes_idx = (self.episodes_idx + 1) % len(self.episodes)
self.scene_idx = self.episodes[self.episodes_idx]["scene_idx"]
print(
"Sampling from pre-defined episodes! Scene id: "
f"{self.scene_idx}, Episode id: {self.episodes_idx}"
)
self.scene_id = self.all_scenes[self.scene_idx]
scene_conn = self.data_conn[self.scene_idx]
self.images_to_idx = scene_conn["images_to_idx"]
self.images_to_camera = scene_conn["images_to_camera"]
self.images_to_nodes = scene_conn["images_to_nodes"]
self.scale = scene_conn["scale"] # Converts positions to mm.
if self.episodes is None:
self.agent_image = self._rng.choice(list(self.images_to_idx.keys()))
while not self.is_valid_image(self.agent_image):
self.agent_image = self._rng.choice(list(self.images_to_idx.keys()))
else:
self.agent_image = self.episodes[self.episodes_idx]["start_image"]
# Initialize the environment variables.
self.start_position = self._get_position(self.agent_image)
self.start_pose = self._get_pose(self.agent_image)
self.exp_start_image = self.agent_image
self.exp_start_position = copy.deepcopy(self.start_position)
self.exp_start_pose = self.start_pose
self.agent_pose = self.start_pose
self.agent_position = copy.deepcopy(self.start_position)
self.scene_images = np.array(self.data_h5[f"{self.scene_id}/rgb"])
self.scene_depth = np.array(self.data_h5[f"{self.scene_id}/depth"])
self.delta = (0.0, 0.0, 0.0, 0.0)
self.collision_occurred = False
self.steps = 0
self.graph = create_nav_graph(self.data_conn[self.scene_idx])
self._load_nav_graph()
# Get scene annotations.
annotations_path = f"{ROOT_DIR}/{self.scene_id}/annotations.json"
self.annotations = json.load(open(annotations_path, "r"))
self.visited_instances = set([])
instances_path = (
f"{VALID_INSTANCES_ROOT_DIR}/{self.scene_id}/instances.json.gzip"
)
with gzip.open(instances_path, "rt") as fp:
self.valid_instances = json.load(fp)
# Sample pose reference.
self._sample_pose_refs()
# Sample random exploration oracle targets.
self._sample_oracle_targets()
self._stuck_at_oracle = False
# Path taken so far.
self.exp_path_so_far = []
self.nav_path_so_far = []
self.top_down_env = None
# Count-based rewards.
self._setup_counts()
# Create occupancy map related variables.
min_x, min_y, max_x, max_y = self.get_environment_extents()
self.L_min = min(min_x, min_y) - 4000
self.L_max = max(max_x, max_y) + 4000
self.grid_size = self.map_scale
grid_num = np.array([self.map_size, self.map_size])
self.grids_mat.fill(0)
self.count_grids_mat.fill(0)
self.max_grid_size = np.max(grid_num)
self.max_seen_area = float(np.prod(grid_num))
self.seen_area = 0
self.seen_count_reward = 0.0
# Sample target.
self._sample_target()
# Navigation evaluation stuff.
self._path_length_so_far = 0.0
self._start_regress = np.array([0, 0, 0, 0])
return self._get_obs()