in environments/gym-avd/gym_avd/envs/avd_pose_env.py [0:0]
def _sample_pose_refs(self):
r"""Sample views from landmark clusters as the pose references.
"""
# Load landmark clusters information.
cluster_root = self.cluster_root_dir
json_path = f"{cluster_root}/{self.scene_id}/reference_clusters.json"
self.cluster_info = json.load(open(json_path, "r"))
# Restrict to clusters with variance in position less than 1.5m>
filtered_clusters = filter(lambda x: x["var"] < 1.5, self.cluster_info)
filtered_clusters = list(filtered_clusters)
num_clusters = len(filtered_clusters)
# Sort clusters by proximity to starting position.
start_position = np.array(self.start_position)
unsorted_clusters = [
(i, np.array(cluster["cluster_position"]) * 1000.0) # m -> mm
for i, cluster in enumerate(filtered_clusters)
]
sorted_clusters = sorted(
unsorted_clusters, key=lambda c: np.linalg.norm(start_position - c[1]),
)
self._pose_image_names = []
self._pose_refs = []
self._pose_refs_depth = []
self.ref_positions = []
self.ref_poses = []
self._pose_regress = []
# Sample cluster indices.
if self.ref_sample_intervals is None:
# If no sampling intervals are specified, sample all clusters
# uniformly.
sample_idxes = np.linspace(0, num_clusters - 1, self.nRef)
sample_idxes = sample_idxes.astype(int).tolist()
else:
# The sampling intervals specify at what distance intervals from
# the starting position to sample from. If the sampling intervals
# are specified, divide the total number of references equally
# between the intervals.
sample_idxes = []
intervals = self.ref_sample_intervals
intervals = [0] + intervals + [math.inf]
samples_per_interval = min(self.nRef // (len(intervals) - 1), 1)
for i in range(len(intervals) - 1):
# For each interval, identify the set of clusters whose
# centroid lies within this distance interval.
valid_clusters = []
for j, c in enumerate(sorted_clusters):
dist = np.linalg.norm(start_position - c[1])
if (dist >= intervals[i]) and (dist < intervals[i + 1]):
valid_clusters.append(j)
if i != len(intervals) - 2:
nsamples = samples_per_interval
else:
nsamples = self.nRef - samples_per_interval * (len(intervals) - 2)
if len(valid_clusters) > 0:
# If there are valid clusters, sample from them (with
# replacement).
sample_idxes += self._rng.choices(valid_clusters, k=nsamples,)
else:
# If there are no valid clusters, set as invalid clusters.
sample_idxes += [None for _ in range(nsamples)]
# Sample references from the sampled cluster indices.
valid_masks = []
for i in sample_idxes:
if i is None:
# If the reference is invalid, enter dummy data.
valid_masks.append(0)
self._pose_image_names.append("")
self._pose_refs.append(
np.zeros((1, self.HEIGHT, self.WIDTH, 3), dtype=np.uint8)
)
pose_ref_depth = self._process_depth(
np.zeros((self.HEIGHT, self.WIDTH, 1), dtype=np.uint8)
)
pose_ref_depth = pose_ref_depth[np.newaxis, :, :, :]
self._pose_refs_depth.append(pose_ref_depth)
self.ref_positions.append([0, 0, 0])
self.ref_poses.append(0)
self._pose_regress.append((0, 0, 0, 0))
continue
# Randomly pick an image from the current cluster.
cluster_idx = sorted_clusters[i][0]
pose_image = self._rng.choice(filtered_clusters[cluster_idx]["images"])
# Compute data for the pose references.
ref_position = self._get_position(pose_image)
ref_pose = self._get_pose(pose_image)
pose_idx = self.images_to_idx[pose_image]
pose_ref = self.scene_images[pose_idx]
pose_ref_depth = self._process_depth(self.scene_depth[pose_idx])
pose_ref = pose_ref[np.newaxis, :, :, :]
pose_ref_depth = pose_ref_depth[np.newaxis, :, :, :]
# Compute reference pose relative to agent's starting pose.
dx = ref_position[0] - self.start_position[0]
dz = ref_position[2] - self.start_position[2]
dr = math.sqrt(dx ** 2 + dz ** 2)
dtheta = math.atan2(dz, dx) - self.start_pose
dhead = ref_pose - self.start_pose
delev = 0.0
pose_regress = (dr, dtheta, dhead, delev)
# Update the set of pose references.
valid_masks.append(1)
self._pose_image_names.append(pose_image)
self._pose_refs.append(pose_ref)
self._pose_refs_depth.append(pose_ref_depth)
self.ref_positions.append(ref_position)
self.ref_poses.append(ref_pose)
self._pose_regress.append(pose_regress)
self._pose_refs = np.concatenate(self._pose_refs, axis=0)
self._pose_refs_depth = np.concatenate(self._pose_refs_depth, axis=0)
self.ref_positions = np.array(self.ref_positions)
self.ref_poses = np.array(self.ref_poses)
self._pose_regress = np.array(self._pose_regress)
self.oracle_pose_successes = np.zeros((self.nRef,))
self._valid_masks = np.array(valid_masks)