opensfm/tracking.py (171 lines of code) (raw):

import logging import typing as t import networkx as nx import numpy as np from opensfm import pymap from opensfm.dataset_base import DataSetBase from opensfm.unionfind import UnionFind logger = logging.getLogger(__name__) def load_features( dataset: DataSetBase, images: t.List[str] ) -> t.Tuple[ t.Dict[str, np.ndarray], t.Dict[str, np.ndarray], t.Dict[str, np.ndarray], t.Dict[str, np.ndarray], ]: logging.info("reading features") features = {} colors = {} segmentations = {} instances = {} for im in images: features_data = dataset.load_features(im) if not features_data: continue features[im] = features_data.points[:, :3] colors[im] = features_data.colors semantic_data = features_data.semantic if semantic_data: segmentations[im] = semantic_data.segmentation if semantic_data.has_instances(): instances[im] = semantic_data.instances return features, colors, segmentations, instances def load_matches( dataset: DataSetBase, images: t.List[str] ) -> t.Dict[t.Tuple[str, str], t.List[t.Tuple[int, int]]]: matches = {} for im1 in images: try: im1_matches = dataset.load_matches(im1) except IOError: continue for im2 in im1_matches: if im2 in images: matches[im1, im2] = im1_matches[im2] return matches def create_tracks_manager( features: t.Dict[str, np.ndarray], colors: t.Dict[str, np.ndarray], segmentations: t.Dict[str, np.ndarray], instances: t.Dict[str, np.ndarray], matches: t.Dict[t.Tuple[str, str], t.List[t.Tuple[int, int]]], min_length: int, ): """Link matches into tracks.""" logger.debug("Merging features onto tracks") uf = UnionFind() for im1, im2 in matches: for f1, f2 in matches[im1, im2]: uf.union((im1, f1), (im2, f2)) sets = {} for i in uf: p = uf[i] if p in sets: sets[p].append(i) else: sets[p] = [i] tracks = [t for t in sets.values() if _good_track(t, min_length)] logger.debug("Good tracks: {}".format(len(tracks))) NO_VALUE = pymap.Observation.NO_SEMANTIC_VALUE tracks_manager = pymap.TracksManager() for track_id, track in enumerate(tracks): for image, featureid in track: if image not in features: continue x, y, s = features[image][featureid] r, g, b = colors[image][featureid] segmentation, instance = ( segmentations[image][featureid] if image in segmentations else NO_VALUE, instances[image][featureid] if image in instances else NO_VALUE, ) obs = pymap.Observation( x, y, s, int(r), int(g), int(b), featureid, segmentation, instance ) tracks_manager.add_observation(image, str(track_id), obs) return tracks_manager def common_tracks( tracks_manager: pymap.TracksManager, im1: str, im2: str ) -> t.Tuple[t.List[str], np.ndarray, np.ndarray]: """List of tracks observed in both images. Args: tracks_manager: tracks manager im1: name of the first image im2: name of the second image Returns: tuple: tracks, feature from first image, feature from second image """ t1 = tracks_manager.get_shot_observations(im1) t2 = tracks_manager.get_shot_observations(im2) tracks, p1, p2 = [], [], [] for track, obs in t1.items(): if track in t2: p1.append(obs.point) p2.append(t2[track].point) tracks.append(track) p1 = np.array(p1) p2 = np.array(p2) return tracks, p1, p2 TPairTracks = t.Tuple[t.List[str], np.ndarray, np.ndarray] def all_common_tracks_with_features( tracks_manager: pymap.TracksManager, min_common: int = 50, ) -> t.Dict[t.Tuple[str, str], TPairTracks]: tracks = all_common_tracks( tracks_manager, include_features=True, min_common=min_common ) return t.cast(t.Dict[t.Tuple[str, str], TPairTracks], tracks) def all_common_tracks_without_features( tracks_manager: pymap.TracksManager, min_common: int = 50, ) -> t.Dict[t.Tuple[str, str], t.List[str]]: tracks = all_common_tracks( tracks_manager, include_features=False, min_common=min_common ) return t.cast(t.Dict[t.Tuple[str, str], t.List[str]], tracks) def all_common_tracks( tracks_manager: pymap.TracksManager, include_features: bool = True, min_common: int = 50, ) -> t.Dict[t.Tuple[str, str], t.Union[TPairTracks, t.List[str]]]: """List of tracks observed by each image pair. Args: tracks_manager: tracks manager include_features: whether to include the features from the images min_common: the minimum number of tracks the two images need to have in common Returns: tuple: im1, im2 -> tuple: tracks, features from first image, features from second image """ common_tracks = {} for (im1, im2), size in tracks_manager.get_all_pairs_connectivity().items(): if size < min_common: continue tuples = tracks_manager.get_all_common_observations(im1, im2) if include_features: common_tracks[im1, im2] = ( [v for v, _, _ in tuples], np.array([p.point for _, p, _ in tuples]), np.array([p.point for _, _, p in tuples]), ) else: common_tracks[im1, im2] = [v for v, _, _ in tuples] return common_tracks def _good_track(track: t.List[t.Tuple[str, int]], min_length: int) -> bool: if len(track) < min_length: return False images = [f[0] for f in track] if len(images) != len(set(images)): return False return True def as_weighted_graph(tracks_manager: pymap.TracksManager) -> nx.Graph: """Return the tracks manager as a weighted graph having shots a snodes and weighted by the # of common tracks between two nodes. """ images = tracks_manager.get_shot_ids() image_graph = nx.Graph() for im in images: image_graph.add_node(im) for k, v in tracks_manager.get_all_pairs_connectivity().items(): image_graph.add_edge(k[0], k[1], weight=v) return image_graph def as_graph(tracks_manager: pymap.TracksManager) -> nx.Graph: """Return the tracks manager as a bipartite graph (legacy).""" tracks = tracks_manager.get_track_ids() images = tracks_manager.get_shot_ids() graph = nx.Graph() for track_id in tracks: graph.add_node(track_id, bipartite=1) for shot_id in images: graph.add_node(shot_id, bipartite=0) for track_id in tracks: for im, obs in tracks_manager.get_track_observations(track_id).items(): graph.add_edge( im, track_id, feature=obs.point, feature_scale=obs.scale, feature_id=obs.id, feature_color=obs.color, feature_segmentation=obs.segmentation, feature_instance=obs.instance, ) return graph