in ss_baselines/av_wan/models/planner.py [0:0]
def _map_to_graph(self, geometric_map: np.array) -> nx.Graph:
# after bitwise_and op, 0 indicates free or unexplored, 1 indicate obstacles
occupancy_map = np.bitwise_and(geometric_map[:, :, 0] >= self._obstacle_threshold,
geometric_map[:, :, 1] >= self._obstacle_threshold)
graph = nx.Graph()
for idx_y, y in enumerate(self._navigable_ys):
for idx_x, x in enumerate(self._navigable_xs):
node_index = y * len(self._navigable_ys) + x
if occupancy_map[y][x]:
# obstacle
continue
# no obstacle to the next navigable point along +Z direction
if idx_y < len(self._navigable_ys) - 1:
next_y = self._navigable_ys[idx_y + 1]
if not any(occupancy_map[y: next_y+1, x]):
next_node_index = next_y * len(self._navigable_ys) + x
if node_index not in graph:
graph.add_node(node_index, map_index=(x, y))
if next_node_index not in graph:
graph.add_node(next_node_index, map_index=(x, next_y))
graph.add_edge(node_index, next_node_index)
# no obstacle to the next navigable point along +X direction
if idx_x < len(self._navigable_xs) - 1:
next_x = self._navigable_xs[idx_x + 1]
if not any(occupancy_map[y, x: next_x+1]):
next_node_index = y * len(self._navigable_ys) + next_x
if node_index not in graph:
graph.add_node(node_index, map_index=(x, y))
if next_node_index not in graph:
graph.add_node(next_node_index, map_index=(next_x, y))
graph.add_edge(node_index, next_node_index)
# trim the graph such that it only keeps the largest subgraph
connected_subgraphs = (graph.subgraph(c) for c in nx.connected_components(graph))
max_connected_graph = max(connected_subgraphs, key=len)
return nx.Graph(max_connected_graph)