in ss_baselines/av_wan/models/mapper.py [0:0]
def update(self, prev_action: int, ego_map, intensity) -> Tuple[list, list]:
if logging.root.level == logging.DEBUG:
self._prev_geometric_map = np.copy(self._geometric_map)
if prev_action == HabitatSimActions.MOVE_FORWARD:
self._x += int(self._stride * np.cos(np.deg2rad(self._orientation)))
self._y += int(self._stride * np.sin(np.deg2rad(self._orientation)))
elif prev_action == HabitatSimActions.TURN_LEFT:
self._orientation = (self._orientation - 90) % 360
elif prev_action == HabitatSimActions.TURN_RIGHT:
self._orientation = (self._orientation + 90) % 360
else:
# do nothing for the first step
pass
# update global map
rotated_geometric_map = rotate_map(self._geometric_map, -self._rotation, create_copy=False)
rotated_x, rotated_y = transform_coordinates(self._x, self._y, -self._rotation,
self._geometric_map.shape[1], self._geometric_map.shape[0])
left = rotated_x - int(ego_map.shape[1] / 2)
right = left + ego_map.shape[1]
# does not update the agent's current location
top = rotated_y
bottom = top - ego_map.shape[0]
rotated_geometric_map[bottom: top, left: right, :] = \
np.logical_or(rotated_geometric_map[bottom: top, left: right, :] > 0.5, ego_map > 0.5)
# update acoustic map
if self._use_acoustic_map:
am_x = self._x // self._stride
am_y = self._y // self._stride
if self._am_encoding == 'intensity':
self._acoustic_map[am_y, am_x, 0] = intensity
elif self._am_encoding == 'average_intensity':
if self._acoustic_map[am_y, am_x] == 0:
self._acoustic_map[am_y, am_x] = intensity
else:
self._acoustic_map[am_y, am_x] = 0.5 * intensity + 0.5 * self._acoustic_map[am_y, am_x]
# compute new blocked paths and non-navigable points in the affected region
new_left = max(left - self._stride, 0)
new_bottom = max(bottom - self._stride, 0)
new_right = min(right + self._stride, self._geometric_map.shape[1])
new_top = min(top + self._stride, self._geometric_map.shape[0])
m = self._stride
navigable_xs = []
for n in range(int((new_left - rotated_x) / m), int((new_right + 1 - rotated_x) / m)):
navigable_xs.append(rotated_x + n * m)
navigable_ys = []
for n in range(int((new_bottom - rotated_y) / m), int((new_top + 1 - rotated_y) / m)):
navigable_ys.append(rotated_y + n * m)
def convert(a, b):
return transform_coordinates(a, b, self._rotation, rotated_geometric_map.shape[1], rotated_geometric_map.shape[0])
non_navigable_points = []
blocked_paths = []
for idx_y, y in enumerate(navigable_ys):
for idx_x, x in enumerate(navigable_xs):
if rotated_geometric_map[y, x, 0]:
if x == rotated_x and y == rotated_y:
logging.warning("Mapper: marked current position as obstacle")
self._geometric_map[self._y, self._x, 0] = 0
else:
non_navigable_points.append(convert(x, y))
# no obstacle to the next navigable point along +Z direction
if idx_y < len(navigable_ys) - 1:
next_y = navigable_ys[idx_y + 1]
if any(rotated_geometric_map[y: next_y + 1, x, 0]):
blocked_paths.append((convert(x, y), convert(x, next_y)))
# no obstacle to the next navigable point along +X direction
if idx_x < len(navigable_xs) - 1:
next_x = navigable_xs[idx_x + 1]
if any(rotated_geometric_map[y, x: next_x + 1, 0]):
blocked_paths.append((convert(x, y), convert(next_x, y)))
assert (self._x, self._y) not in non_navigable_points
return non_navigable_points, blocked_paths