def _map_to_graph()

in ss_baselines/av_wan/models/planner.py [0:0]


    def _map_to_graph(self, geometric_map: np.array) -> nx.Graph:
        # after bitwise_and op, 0 indicates free or unexplored, 1 indicate obstacles
        occupancy_map = np.bitwise_and(geometric_map[:, :, 0] >= self._obstacle_threshold,
                                       geometric_map[:, :, 1] >= self._obstacle_threshold)
        graph = nx.Graph()
        for idx_y, y in enumerate(self._navigable_ys):
            for idx_x, x in enumerate(self._navigable_xs):
                node_index = y * len(self._navigable_ys) + x

                if occupancy_map[y][x]:
                    # obstacle
                    continue

                # no obstacle to the next navigable point along +Z direction
                if idx_y < len(self._navigable_ys) - 1:
                    next_y = self._navigable_ys[idx_y + 1]
                    if not any(occupancy_map[y: next_y+1, x]):
                        next_node_index = next_y * len(self._navigable_ys) + x
                        if node_index not in graph:
                            graph.add_node(node_index, map_index=(x, y))
                        if next_node_index not in graph:
                            graph.add_node(next_node_index, map_index=(x, next_y))
                        graph.add_edge(node_index, next_node_index)

                # no obstacle to the next navigable point along +X direction
                if idx_x < len(self._navigable_xs) - 1:
                    next_x = self._navigable_xs[idx_x + 1]
                    if not any(occupancy_map[y, x: next_x+1]):
                        next_node_index = y * len(self._navigable_ys) + next_x
                        if node_index not in graph:
                            graph.add_node(node_index, map_index=(x, y))
                        if next_node_index not in graph:
                            graph.add_node(next_node_index, map_index=(next_x, y))
                        graph.add_edge(node_index, next_node_index)

        # trim the graph such that it only keeps the largest subgraph
        connected_subgraphs = (graph.subgraph(c) for c in nx.connected_components(graph))
        max_connected_graph = max(connected_subgraphs, key=len)

        return nx.Graph(max_connected_graph)