opensfm/synthetic_data/synthetic_dataset.py (166 lines of code) (raw):
import collections
import logging
import os
import shelve
from typing import Optional, Dict, Any, List, Tuple, Union
import numpy as np
from opensfm import tracking, features as oft, types, pymap, pygeometry, io, geo
from opensfm.dataset import DataSet
logger = logging.getLogger(__name__)
class SyntheticFeatures(collections.abc.MutableMapping):
database: Union[Dict[str, oft.FeaturesData], shelve.Shelf]
def __init__(self, on_disk_filename: Optional[str]) -> None:
if on_disk_filename:
self.database = shelve.open(on_disk_filename, flag="n")
else:
self.database = {}
for m in ["keys", "items", "values", "get"]:
setattr(self, m, getattr(self.database, m))
def sync(self) -> None:
database = self.database
if type(database) is dict:
return
else:
database.sync()
def __getitem__(self, key):
return self.database.__getitem__(key)
def __setitem__(self, key, item):
return self.database.__setitem__(key, item)
def __delitem__(self, key):
return self.database.__delitem__(key)
def __iter__(self):
return self.database.__iter__()
def __len__(self):
return self.database.__len__()
class SyntheticDataSet(DataSet):
reconstruction: types.Reconstruction
exifs: Dict[str, Any]
features: Optional[SyntheticFeatures]
reference: geo.TopocentricConverter
gcps: Optional[Dict[str, pymap.GroundControlPoint]]
def __init__(
self,
reconstruction: types.Reconstruction,
exifs: Dict[str, Any],
features: Optional[SyntheticFeatures] = None,
tracks_manager: Optional[pymap.TracksManager] = None,
gcps: Optional[Dict[str, pymap.GroundControlPoint]] = None,
output_path: Optional[str] = None,
) -> None:
data_path = "" if not output_path else output_path
if data_path:
io.mkdir_p(data_path)
io.mkdir_p(os.path.join(data_path, "images"))
super(SyntheticDataSet, self).__init__(data_path)
self.reconstruction = reconstruction
self.exifs = exifs
self.gcps = gcps
self.features = features
self.tracks_manager = tracks_manager
self.image_list = list(reconstruction.shots.keys())
self.reference = reconstruction.reference
self.matches = None
self.config["use_altitude_tag"] = True
self.config["align_method"] = "naive"
def images(self) -> List[str]:
return self.image_list
def _raise_if_absent_image(self, image: str):
if image not in self.image_list:
raise RuntimeError("Image isn't present in the synthetic dataset")
def load_camera_models(self) -> Dict[str, pygeometry.Camera]:
return self.reconstruction.cameras
def save_camera_models(self, camera_models: Dict[str, pygeometry.Camera]) -> None:
for camera in camera_models.values():
self.reconstruction.add_camera(camera)
def load_rig_cameras(self) -> Dict[str, pymap.RigCamera]:
return self.reconstruction.rig_cameras
def load_rig_assignments(self) -> Dict[str, List[Tuple[str, str]]]:
rig_assignments = {}
for instance in self.reconstruction.rig_instances.values():
rig_assignments[instance.id] = [
(k, v.id) for k, v in instance.rig_cameras.items()
]
return rig_assignments
def load_exif(self, image: str) -> Dict[str, Any]:
self._raise_if_absent_image(image)
return self.exifs[image]
def exif_exists(self, image: str) -> bool:
return image in self.image_list
def features_exist(self, image: str) -> bool:
if image not in self.image_list:
return False
if self.features is None:
return False
feat = self.features
if feat is None:
return False
return image in feat
def load_words(self, image: str):
self._raise_if_absent_image(image)
n_closest = 50
return [image] * n_closest
def load_features(self, image: str) -> Optional[oft.FeaturesData]:
self._raise_if_absent_image(image)
if not self.features:
return None
feat = self.features
if feat is None:
return None
return feat[image]
def save_features(self, image: str, features_data: oft.FeaturesData) -> None:
pass
def matches_exists(self, image: str) -> bool:
if image not in self.image_list:
return False
self._check_and_create_matches()
if self.matches is None:
return False
return True
def load_matches(self, image: str) -> Dict[str, np.ndarray]:
self._raise_if_absent_image(image)
self._check_and_create_matches()
if self.matches is not None:
return self.matches[image]
else:
return {}
def load_image_list(self) -> None:
pass
def _check_and_create_matches(self) -> None:
if self.matches is None:
self.matches = self._construct_matches()
def _construct_matches(self) -> Dict[str, Any]:
matches = {}
tracks_manager = self.load_tracks_manager()
for im1 in self.images():
for im2 in self.images():
if im1 == im2:
continue
image_matches = matches.setdefault(im1, {})
tracks = tracking.common_tracks(tracks_manager, im1, im2)[0]
if len(tracks) > 10:
pair_matches = []
for t in tracks:
observations = tracks_manager.get_track_observations(t)
pair_matches.append(
np.array([observations[im1].id, observations[im2].id])
)
image_matches[im2] = np.array(pair_matches)
return matches
def load_tracks_manager(
self, filename: Optional[str] = None
) -> pymap.TracksManager:
tracks_mgr = self.tracks_manager
if not tracks_mgr:
raise RuntimeError("No tracks manager for the synthetic dataset")
return tracks_mgr
def init_reference(self, images: Optional[List[str]] = None) -> None:
pass
def load_reference(self) -> geo.TopocentricConverter:
return self.reference
def reference_exists(self) -> bool:
return True
def load_ground_control_points(self) -> List[pymap.GroundControlPoint]:
if self.gcps:
return list(self.gcps.values())
else:
return []