opensfm/reconstruction.py (1,302 lines of code) (raw):

"""Incremental reconstruction pipeline""" import datetime import enum import logging import math from abc import abstractmethod, ABC from collections import defaultdict from itertools import combinations from timeit import default_timer as timer from typing import Dict, Any, List, Tuple, Set, Optional import cv2 import numpy as np from opensfm import ( log, matching, multiview, pybundle, pygeometry, pymap, pysfm, rig, tracking, types, reconstruction_helpers as helpers, ) from opensfm.align import align_reconstruction, apply_similarity from opensfm.context import current_memory_usage, parallel_map from opensfm.dataset_base import DataSetBase logger = logging.getLogger(__name__) class ReconstructionAlgorithm(str, enum.Enum): INCREMENTAL = "incremental" TRIANGULATION = "triangulation" def _get_camera_from_bundle( ba: pybundle.BundleAdjuster, camera: pygeometry.Camera ) -> None: """Read camera parameters from a bundle adjustment problem.""" c = ba.get_camera(camera.id) for k, v in c.get_parameters_map().items(): camera.set_parameter_value(k, v) def _add_gcp_to_bundle( ba: pybundle.BundleAdjuster, reference: types.TopocentricConverter, gcp: List[pymap.GroundControlPoint], shots: Dict[str, pymap.Shot], gcp_horizontal_sd: float, gcp_vertical_sd: float, ) -> int: """Add Ground Control Points constraints to the bundle problem.""" added_gcps = 0 gcp_sd = np.array([gcp_horizontal_sd, gcp_horizontal_sd, gcp_vertical_sd]) for point in gcp: point_id = "gcp-" + point.id coordinates = multiview.triangulate_gcp( point, shots, reproj_threshold=1, min_ray_angle_degrees=0.1, ) if coordinates is None: if point.lla: coordinates = reference.to_topocentric(*point.lla_vec) else: logger.warning( "Cannot initialize GCP '{}'." " Ignoring it".format(point.id) ) continue ba.add_point(point_id, coordinates, False) if point.lla: point_enu = reference.to_topocentric(*point.lla_vec) ba.add_point_prior(point_id, point_enu, gcp_sd, point.has_altitude) for observation in point.observations: if observation.shot_id in shots: # TODO(pau): move this to a config or per point parameter. scale = 0.0001 ba.add_point_projection_observation( observation.shot_id, point_id, observation.projection, scale, ) added_gcps += 1 return added_gcps def bundle( reconstruction: types.Reconstruction, camera_priors: Dict[str, pygeometry.Camera], rig_camera_priors: Dict[str, pymap.RigCamera], gcp: Optional[List[pymap.GroundControlPoint]], config: Dict[str, Any], ) -> Dict[str, Any]: """Bundle adjust a reconstruction.""" report = pysfm.BAHelpers.bundle( reconstruction.map, dict(camera_priors), dict(rig_camera_priors), gcp if gcp is not None else [], config, ) logger.debug(report["brief_report"]) return report def bundle_shot_poses( reconstruction: types.Reconstruction, shot_ids: Set[str], camera_priors: Dict[str, pygeometry.Camera], rig_camera_priors: Dict[str, pymap.RigCamera], config: Dict[str, Any], ) -> Dict[str, Any]: """Bundle adjust a set of shots poses.""" report = pysfm.BAHelpers.bundle_shot_poses( reconstruction.map, shot_ids, dict(camera_priors), dict(rig_camera_priors), config, ) return report def bundle_local( reconstruction: types.Reconstruction, camera_priors: Dict[str, pygeometry.Camera], rig_camera_priors: Dict[str, pymap.RigCamera], gcp: Optional[List[pymap.GroundControlPoint]], central_shot_id: str, config: Dict[str, Any], ) -> Tuple[Dict[str, Any], List[int]]: """Bundle adjust the local neighborhood of a shot.""" pt_ids, report = pysfm.BAHelpers.bundle_local( reconstruction.map, dict(camera_priors), dict(rig_camera_priors), gcp if gcp is not None else [], central_shot_id, config, ) logger.debug(report["brief_report"]) return pt_ids, report def shot_neighborhood( reconstruction: types.Reconstruction, central_shot_id: str, radius: int, min_common_points: int, max_interior_size: int, ) -> Tuple[Set[str], Set[str]]: """Reconstructed shots near a given shot. Returns: a tuple with interior and boundary: - interior: the list of shots at distance smaller than radius - boundary: shots sharing at least on point with the interior Central shot is at distance 0. Shots at distance n + 1 share at least min_common_points points with shots at distance n. """ max_boundary_size = 1000000 interior = {central_shot_id} for _distance in range(1, radius): remaining = max_interior_size - len(interior) if remaining <= 0: break neighbors = direct_shot_neighbors( reconstruction, interior, min_common_points, remaining ) interior.update(neighbors) boundary = direct_shot_neighbors(reconstruction, interior, 1, max_boundary_size) return interior, boundary def direct_shot_neighbors( reconstruction: types.Reconstruction, shot_ids: Set[str], min_common_points: int, max_neighbors: int, ) -> Set[str]: """Reconstructed shots sharing reconstructed points with a shot set.""" points = set() for shot_id in shot_ids: shot = reconstruction.shots[shot_id] valid_landmarks = shot.get_valid_landmarks() for track in valid_landmarks: if track.id in reconstruction.points: points.add(track) candidate_shots = set(reconstruction.shots) - set(shot_ids) common_points = defaultdict(int) for track in points: neighbors = track.get_observations() for neighbor in neighbors: if neighbor.id in candidate_shots: common_points[neighbor] += 1 pairs = sorted(common_points.items(), key=lambda x: -x[1]) neighbors = set() for neighbor, num_points in pairs[:max_neighbors]: if num_points >= min_common_points: neighbors.add(neighbor.id) else: break return neighbors def pairwise_reconstructability(common_tracks: int, rotation_inliers: int) -> float: """Likeliness of an image pair giving a good initial reconstruction.""" outliers = common_tracks - rotation_inliers outlier_ratio = float(outliers) / common_tracks if outlier_ratio >= 0.3: return outliers else: return 0 TPairArguments = Tuple[ str, str, np.ndarray, np.ndarray, pygeometry.Camera, pygeometry.Camera, float ] def compute_image_pairs( track_dict: Dict[Tuple[str, str], tracking.TPairTracks], data: DataSetBase ) -> List[Tuple[str, str]]: """All matched image pairs sorted by reconstructability.""" cameras = data.load_camera_models() args = _pair_reconstructability_arguments(track_dict, cameras, data) processes = data.config["processes"] result = parallel_map(_compute_pair_reconstructability, args, processes) result = list(result) pairs = [(im1, im2) for im1, im2, r in result if r > 0] score = [r for im1, im2, r in result if r > 0] order = np.argsort(-np.array(score)) return [pairs[o] for o in order] def _pair_reconstructability_arguments( track_dict: Dict[Tuple[str, str], tracking.TPairTracks], cameras: Dict[str, pygeometry.Camera], data: DataSetBase, ) -> List[TPairArguments]: threshold = 4 * data.config["five_point_algo_threshold"] args = [] for (im1, im2), (_, p1, p2) in track_dict.items(): camera1 = cameras[data.load_exif(im1)["camera"]] camera2 = cameras[data.load_exif(im2)["camera"]] args.append((im1, im2, p1, p2, camera1, camera2, threshold)) return args def _compute_pair_reconstructability(args: TPairArguments) -> Tuple[str, str, float]: log.setup() im1, im2, p1, p2, camera1, camera2, threshold = args R, inliers = two_view_reconstruction_rotation_only( p1, p2, camera1, camera2, threshold ) r = pairwise_reconstructability(len(p1), len(inliers)) return (im1, im2, r) def add_shot( data: DataSetBase, reconstruction: types.Reconstruction, rig_assignments: Dict[str, Tuple[str, str, List[str]]], shot_id: str, pose: pygeometry.Pose, ) -> Set[str]: """Add a shot to the recontruction. In case of a shot belonging to a rig instance, the pose of shot will drive the initial pose setup of the rig instance. All necessary shots and rig models will be created. """ added_shots = set() if shot_id not in rig_assignments: camera_id = data.load_exif(shot_id)["camera"] shot = reconstruction.create_shot(shot_id, camera_id, pose) shot.metadata = helpers.get_image_metadata(data, shot_id) added_shots = {shot_id} else: instance_id, _, instance_shots = rig_assignments[shot_id] rig_instance = reconstruction.add_rig_instance(pymap.RigInstance(instance_id)) for shot in instance_shots: _, rig_camera_id, _ = rig_assignments[shot] camera_id = data.load_exif(shot)["camera"] created_shot = reconstruction.create_shot( shot, camera_id, pygeometry.Pose(), rig_camera_id, instance_id, ) created_shot.metadata = helpers.get_image_metadata(data, shot) rig_instance.update_instance_pose_with_shot(shot_id, pose) added_shots = set(instance_shots) return added_shots def _two_view_reconstruction_inliers( b1: np.ndarray, b2: np.ndarray, R: np.ndarray, t: np.ndarray, threshold: float ) -> List[int]: """Returns indices of matches that can be triangulated.""" ok = matching.compute_inliers_bearings(b1, b2, R, t, threshold) return np.nonzero(ok)[0] def two_view_reconstruction_plane_based( b1: np.ndarray, b2: np.ndarray, threshold: float, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[int]]: """Reconstruct two views from point correspondences lying on a plane. Args: b1, b2: lists bearings in the images threshold: reprojection error threshold Returns: rotation, translation and inlier list """ x1 = multiview.euclidean(b1) x2 = multiview.euclidean(b2) H, inliers = cv2.findHomography(x1, x2, cv2.RANSAC, threshold) motions = multiview.motion_from_plane_homography(H) if not motions: return None, None, [] if len(motions) == 0: return None, None, [] motion_inliers = [] for R, t, _, _ in motions: inliers = _two_view_reconstruction_inliers(b1, b2, R.T, -R.T.dot(t), threshold) motion_inliers.append(inliers) best = np.argmax(map(len, motion_inliers)) R, t, n, d = motions[best] inliers = motion_inliers[best] return cv2.Rodrigues(R)[0].ravel(), t, inliers def two_view_reconstruction_and_refinement( b1: np.ndarray, b2: np.ndarray, R: np.ndarray, t: np.ndarray, threshold: float, iterations: int, transposed: bool, ) -> Tuple[np.ndarray, np.ndarray, List[int]]: """Reconstruct two views using provided rotation and translation. Args: b1, b2: lists bearings in the images R, t: rotation & translation threshold: reprojection error threshold iterations: number of iteration for refinement transposed: use transposed R, t instead Returns: rotation, translation and inlier list """ if transposed: t_curr = -R.T.dot(t) R_curr = R.T else: t_curr = t.copy() R_curr = R.copy() inliers = _two_view_reconstruction_inliers(b1, b2, R_curr, t_curr, threshold) if len(inliers) > 5: T = multiview.relative_pose_optimize_nonlinear( b1[inliers], b2[inliers], t_curr, R_curr, iterations ) R_curr = T[:, :3] t_curr = T[:, 3] inliers = _two_view_reconstruction_inliers(b1, b2, R_curr, t_curr, threshold) return cv2.Rodrigues(R_curr.T)[0].ravel(), -R_curr.T.dot(t_curr), inliers def _two_view_rotation_inliers( b1: np.ndarray, b2: np.ndarray, R: np.ndarray, threshold: float ) -> List[int]: br2 = R.dot(b2.T).T ok = np.linalg.norm(br2 - b1, axis=1) < threshold return np.nonzero(ok)[0] def two_view_reconstruction_rotation_only( p1: np.ndarray, p2: np.ndarray, camera1: pygeometry.Camera, camera2: pygeometry.Camera, threshold: float, ) -> Tuple[np.ndarray, List[int]]: """Find rotation between two views from point correspondences. Args: p1, p2: lists points in the images camera1, camera2: Camera models threshold: reprojection error threshold Returns: rotation and inlier list """ b1 = camera1.pixel_bearing_many(p1) b2 = camera2.pixel_bearing_many(p2) R = multiview.relative_pose_ransac_rotation_only(b1, b2, threshold, 1000, 0.999) inliers = _two_view_rotation_inliers(b1, b2, R, threshold) return cv2.Rodrigues(R.T)[0].ravel(), inliers def two_view_reconstruction_5pt( b1: np.ndarray, b2: np.ndarray, R: np.ndarray, t: np.ndarray, threshold: float, iterations: int, check_reversal: bool = False, reversal_ratio: float = 1.0, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[int]]: """Run 5-point reconstruction and refinement, given computed relative rotation and translation. Optionally, the method will perform reconstruction and refinement for both given and transposed rotation and translation. Args: p1, p2: lists points in the images camera1, camera2: Camera models threshold: reprojection error threshold iterations: number of step for the non-linear refinement of the relative pose check_reversal: whether to check for Necker reversal ambiguity reversal_ratio: ratio of triangulated point between normal and reversed configuration to consider a pair as being ambiguous Returns: rotation, translation and inlier list """ configurations = [False, True] if check_reversal else [False] # Refine both normal and transposed relative motion results_5pt = [] for transposed in configurations: R_5p, t_5p, inliers_5p = two_view_reconstruction_and_refinement( b1, b2, R, t, threshold, iterations, transposed, ) valid_curr_5pt = R_5p is not None and t_5p is not None if len(inliers_5p) <= 5 or not valid_curr_5pt: continue logger.info( f"Two-view 5-points reconstruction inliers (transposed={transposed}): {len(inliers_5p)} / {len(b1)}" ) results_5pt.append((R_5p, t_5p, inliers_5p)) # Use relative motion if one version stands out if len(results_5pt) == 1: R_5p, t_5p, inliers_5p = results_5pt[0] elif len(results_5pt) == 2: inliers1, inliers2 = results_5pt[0][2], results_5pt[1][2] len1, len2 = len(inliers1), len(inliers2) ratio = min(len1, len2) / max(len1, len2) if ratio > reversal_ratio: logger.warning( f"Un-decidable Necker configuration (ratio={ratio}), skipping." ) R_5p, t_5p, inliers_5p = None, None, [] else: index = 0 if len1 > len2 else 1 R_5p, t_5p, inliers_5p = results_5pt[index] else: R_5p, t_5p, inliers_5p = None, None, [] return R_5p, t_5p, inliers_5p def two_view_reconstruction_general( p1: np.ndarray, p2: np.ndarray, camera1: pygeometry.Camera, camera2: pygeometry.Camera, threshold: float, iterations: int, check_reversal: bool = False, reversal_ratio: float = 1.0, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[int], Dict[str, Any]]: """Reconstruct two views from point correspondences. These will try different reconstruction methods and return the results of the one with most inliers. Args: p1, p2: lists points in the images camera1, camera2: Camera models threshold: reprojection error threshold iterations: number of step for the non-linear refinement of the relative pose check_reversal: whether to check for Necker reversal ambiguity reversal_ratio: ratio of triangulated point between normal and reversed configuration to consider a pair as being ambiguous Returns: rotation, translation and inlier list """ b1 = camera1.pixel_bearing_many(p1) b2 = camera2.pixel_bearing_many(p2) # Get 5-point relative motion T_robust = multiview.relative_pose_ransac(b1, b2, threshold, 1000, 0.999) R_robust = T_robust[:, :3] t_robust = T_robust[:, 3] R_5p, t_5p, inliers_5p = two_view_reconstruction_5pt( b1, b2, R_robust, t_robust, threshold, iterations, check_reversal, reversal_ratio, ) valid_5pt = R_5p is not None and t_5p is not None # Compute plane-based relative-motion R_plane, t_plane, inliers_plane = two_view_reconstruction_plane_based( b1, b2, threshold, ) valid_plane = R_plane is not None and t_plane is not None report: Dict[str, Any] = { "5_point_inliers": len(inliers_5p), "plane_based_inliers": len(inliers_plane), } if valid_5pt and len(inliers_5p) > len(inliers_plane): report["method"] = "5_point" R, t, inliers = R_5p, t_5p, inliers_5p elif valid_plane: report["method"] = "plane_based" R, t, inliers = R_plane, t_plane, inliers_plane else: report["decision"] = "Could not find initial motion" logger.info(report["decision"]) R, t, inliers = None, None, [] return R, t, inliers, report def reconstruction_from_relative_pose( data: DataSetBase, tracks_manager: pymap.TracksManager, im1: str, im2: str, R: np.ndarray, t: np.ndarray, ) -> Tuple[Optional[types.Reconstruction], Dict[str, Any]]: """Create a reconstruction from 'im1' and 'im2' using the provided rotation 'R' and translation 't'.""" report = {} min_inliers = data.config["five_point_algo_min_inliers"] camera_priors = data.load_camera_models() rig_camera_priors = data.load_rig_cameras() rig_assignments = rig.rig_assignments_per_image(data.load_rig_assignments()) reconstruction = types.Reconstruction() reconstruction.reference = data.load_reference() reconstruction.cameras = camera_priors reconstruction.rig_cameras = rig_camera_priors new_shots = add_shot(data, reconstruction, rig_assignments, im1, pygeometry.Pose()) if im2 not in new_shots: new_shots |= add_shot( data, reconstruction, rig_assignments, im2, pygeometry.Pose(R, t) ) align_reconstruction(reconstruction, [], data.config) triangulate_shot_features(tracks_manager, reconstruction, new_shots, data.config) logger.info("Triangulated: {}".format(len(reconstruction.points))) report["triangulated_points"] = len(reconstruction.points) if len(reconstruction.points) < min_inliers: report["decision"] = "Initial motion did not generate enough points" logger.info(report["decision"]) return None, report to_adjust = {s for s in new_shots if s != im1} bundle_shot_poses( reconstruction, to_adjust, camera_priors, rig_camera_priors, data.config ) retriangulate(tracks_manager, reconstruction, data.config) if len(reconstruction.points) < min_inliers: report[ "decision" ] = "Re-triangulation after initial motion did not generate enough points" logger.info(report["decision"]) return None, report bundle_shot_poses( reconstruction, to_adjust, camera_priors, rig_camera_priors, data.config ) report["decision"] = "Success" report["memory_usage"] = current_memory_usage() return reconstruction, report def bootstrap_reconstruction( data: DataSetBase, tracks_manager: pymap.TracksManager, im1: str, im2: str, p1: np.ndarray, p2: np.ndarray, ) -> Tuple[Optional[types.Reconstruction], Dict[str, Any]]: """Start a reconstruction using two shots.""" logger.info("Starting reconstruction with {} and {}".format(im1, im2)) report: Dict[str, Any] = { "image_pair": (im1, im2), "common_tracks": len(p1), } camera_priors = data.load_camera_models() camera1 = camera_priors[data.load_exif(im1)["camera"]] camera2 = camera_priors[data.load_exif(im2)["camera"]] threshold = data.config["five_point_algo_threshold"] iterations = data.config["five_point_refine_rec_iterations"] check_reversal = data.config["five_point_reversal_check"] reversal_ratio = data.config["five_point_reversal_ratio"] ( R, t, inliers, report["two_view_reconstruction"], ) = two_view_reconstruction_general( p1, p2, camera1, camera2, threshold, iterations, check_reversal, reversal_ratio ) if R is None or t is None: return None, report rec, rec_report = reconstruction_from_relative_pose( data, tracks_manager, im1, im2, R, t ) report.update(rec_report) return rec, report def reconstructed_points_for_images( tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, images: Set[str], ) -> List[Tuple[str, int]]: """Number of reconstructed points visible on each image. Returns: A list of (image, num_point) pairs sorted by decreasing number of points. """ non_reconstructed = [im for im in images if im not in reconstruction.shots] res = pysfm.count_tracks_per_shot( tracks_manager, non_reconstructed, list(reconstruction.points.keys()) ) return sorted(res.items(), key=lambda x: -x[1]) def resect( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, shot_id: str, threshold: float, min_inliers: int, ) -> Tuple[bool, Set[str], Dict[str, Any]]: """Try resecting and adding a shot to the reconstruction. Return: True on success. """ rig_assignments = rig.rig_assignments_per_image(data.load_rig_assignments()) camera = reconstruction.cameras[data.load_exif(shot_id)["camera"]] bs, Xs, ids = [], [], [] for track, obs in tracks_manager.get_shot_observations(shot_id).items(): if track in reconstruction.points: b = camera.pixel_bearing(obs.point) bs.append(b) Xs.append(reconstruction.points[track].coordinates) ids.append(track) bs = np.array(bs) Xs = np.array(Xs) if len(bs) < 5: return False, set(), {"num_common_points": len(bs)} T = multiview.absolute_pose_ransac(bs, Xs, threshold, 1000, 0.999) R = T[:, :3] t = T[:, 3] reprojected_bs = R.T.dot((Xs - t).T).T reprojected_bs /= np.linalg.norm(reprojected_bs, axis=1)[:, np.newaxis] inliers = np.linalg.norm(reprojected_bs - bs, axis=1) < threshold ninliers = int(sum(inliers)) logger.info("{} resection inliers: {} / {}".format(shot_id, ninliers, len(bs))) report: Dict[str, Any] = { "num_common_points": len(bs), "num_inliers": ninliers, } if ninliers >= min_inliers: R = T[:, :3].T t = -R.dot(T[:, 3]) assert shot_id not in reconstruction.shots new_shots = add_shot( data, reconstruction, rig_assignments, shot_id, pygeometry.Pose(R, t) ) if shot_id in rig_assignments: triangulate_shot_features( tracks_manager, reconstruction, new_shots, data.config ) for i, succeed in enumerate(inliers): if succeed: add_observation_to_reconstruction( tracks_manager, reconstruction, shot_id, ids[i] ) report["shots"] = list(new_shots) return True, new_shots, report else: return False, set(), report def corresponding_tracks( tracks1: Dict[str, pymap.Observation], tracks2: Dict[str, pymap.Observation] ) -> List[Tuple[str, str]]: features1 = {obs.id: t1 for t1, obs in tracks1.items()} corresponding_tracks = [] for t2, obs in tracks2.items(): feature_id = obs.id if feature_id in features1: corresponding_tracks.append((features1[feature_id], t2)) return corresponding_tracks def compute_common_tracks( reconstruction1: types.Reconstruction, reconstruction2: types.Reconstruction, tracks_manager1: pymap.TracksManager, tracks_manager2: pymap.TracksManager, ) -> List[Tuple[str, str]]: common_tracks = set() common_images = set(reconstruction1.shots.keys()).intersection( reconstruction2.shots.keys() ) all_shot_ids1 = set(tracks_manager1.get_shot_ids()) all_shot_ids2 = set(tracks_manager2.get_shot_ids()) for image in common_images: if image not in all_shot_ids1 or image not in all_shot_ids2: continue at_shot1 = tracks_manager1.get_shot_observations(image) at_shot2 = tracks_manager2.get_shot_observations(image) for t1, t2 in corresponding_tracks(at_shot1, at_shot2): if t1 in reconstruction1.points and t2 in reconstruction2.points: common_tracks.add((t1, t2)) return list(common_tracks) def resect_reconstruction( reconstruction1: types.Reconstruction, reconstruction2: types.Reconstruction, tracks_manager1: pymap.TracksManager, tracks_manager2: pymap.TracksManager, threshold: float, min_inliers: int, ) -> Tuple[bool, np.ndarray, List[Tuple[str, str]]]: """Compute a similarity transform `similarity` such as : reconstruction2 = T . reconstruction1 between two reconstruction 'reconstruction1' and 'reconstruction2'. Their respective tracks managers are used to find common tracks that are further used to compute the 3D similarity transform T using RANSAC. """ common_tracks = compute_common_tracks( reconstruction1, reconstruction2, tracks_manager1, tracks_manager2 ) worked, similarity, inliers = align_two_reconstruction( reconstruction1, reconstruction2, common_tracks, threshold ) if not worked or similarity is None: return False, np.ones((4, 4)), [] inliers = [common_tracks[inliers[i]] for i in range(len(inliers))] return True, similarity, inliers def add_observation_to_reconstruction( tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, shot_id: str, track_id: str, ) -> None: observation = tracks_manager.get_observation(shot_id, track_id) reconstruction.add_observation(shot_id, track_id, observation) class TrackHandlerBase(ABC): """Interface for providing/retrieving tracks from/to 'TrackTriangulator'.""" @abstractmethod def get_observations(self, track_id: str) -> Dict[str, pymap.Observation]: """Returns the observations of 'track_id'""" pass @abstractmethod def store_track_coordinates(self, track_id: str, coordinates: np.ndarray) -> None: """Stores coordinates of triangulated track.""" pass @abstractmethod def store_inliers_observation(self, track_id: str, shot_id: str) -> None: """Called by the 'TrackTriangulator' for each track inlier found.""" pass class TrackHandlerTrackManager(TrackHandlerBase): """Provider that reads tracks from a 'TrackManager' object.""" tracks_manager: pymap.TracksManager reconstruction: types.Reconstruction def __init__( self, tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, ) -> None: self.tracks_manager = tracks_manager self.reconstruction = reconstruction def get_observations(self, track_id: str) -> Dict[str, pymap.Observation]: """Return the observations of 'track_id', for all shots that appears in 'self.reconstruction.shots' """ return { k: v for k, v in self.tracks_manager.get_track_observations(track_id).items() if k in self.reconstruction.shots } def store_track_coordinates(self, track_id: str, coordinates: np.ndarray) -> None: """Stores coordinates of triangulated track.""" self.reconstruction.create_point(track_id, coordinates) def store_inliers_observation(self, track_id: str, shot_id: str) -> None: """Stores triangulation inliers in the tracks manager.""" observation = self.tracks_manager.get_observation(shot_id, track_id) self.reconstruction.add_observation(shot_id, track_id, observation) class TrackTriangulator: """Triangulate tracks in a reconstruction. Caches shot origin and rotation matrix """ # for getting shots reconstruction: types.Reconstruction # for storing tracks inliers tracks_handler: TrackHandlerBase # caches origins: Dict[str, np.ndarray] = {} rotation_inverses: Dict[str, np.ndarray] = {} Rts: Dict[str, np.ndarray] = {} def __init__( self, reconstruction: types.Reconstruction, tracks_handler: TrackHandlerBase ) -> None: """Build a triangulator for a specific reconstruction.""" self.reconstruction = reconstruction self.tracks_handler = tracks_handler self.origins = {} self.rotation_inverses = {} self.Rts = {} def triangulate_robust( self, track: str, reproj_threshold: float, min_ray_angle_degrees: float, iterations: int, ) -> None: """Triangulate track in a RANSAC way and add point to reconstruction.""" os, bs, ids = [], [], [] for shot_id, obs in self.tracks_handler.get_observations(track).items(): shot = self.reconstruction.shots[shot_id] os.append(self._shot_origin(shot)) b = shot.camera.pixel_bearing(np.array(obs.point)) r = self._shot_rotation_inverse(shot) bs.append(r.dot(b)) ids.append(shot_id) if len(ids) < 2: return os = np.array(os) bs = np.array(bs) best_inliers = [] best_point = None combinatiom_tried = set() ransac_tries = 11 # 0.99 proba, 60% inliers all_combinations = list(combinations(range(len(ids)), 2)) thresholds = len(os) * [reproj_threshold] for i in range(ransac_tries): random_id = int(np.random.rand() * (len(all_combinations) - 1)) if random_id in combinatiom_tried: continue i, j = all_combinations[random_id] combinatiom_tried.add(random_id) os_t = np.array([os[i], os[j]]) bs_t = np.array([bs[i], bs[j]]) valid_triangulation, X = pygeometry.triangulate_bearings_midpoint( os_t, bs_t, thresholds, np.radians(min_ray_angle_degrees), ) X = pygeometry.point_refinement(os_t, bs_t, X, iterations) if valid_triangulation: reprojected_bs = X - os reprojected_bs /= np.linalg.norm(reprojected_bs, axis=1)[:, np.newaxis] inliers = np.nonzero( np.linalg.norm(reprojected_bs - bs, axis=1) < reproj_threshold )[0].tolist() if len(inliers) > len(best_inliers): _, new_X = pygeometry.triangulate_bearings_midpoint( os[inliers], bs[inliers], len(inliers) * [reproj_threshold], np.radians(min_ray_angle_degrees), ) new_X = pygeometry.point_refinement( os[inliers], bs[inliers], X, iterations ) reprojected_bs = new_X - os reprojected_bs /= np.linalg.norm(reprojected_bs, axis=1)[ :, np.newaxis ] ls_inliers = np.nonzero( np.linalg.norm(reprojected_bs - bs, axis=1) < reproj_threshold )[0] if len(ls_inliers) > len(inliers): best_inliers = ls_inliers best_point = new_X.tolist() else: best_inliers = inliers best_point = X.tolist() pout = 0.99 inliers_ratio = float(len(best_inliers)) / len(ids) if inliers_ratio == 1.0: break optimal_iter = math.log(1.0 - pout) / math.log( 1.0 - inliers_ratio * inliers_ratio ) if optimal_iter <= i: break if len(best_inliers) > 1: self.tracks_handler.store_track_coordinates(track, best_point) for i in best_inliers: self.tracks_handler.store_inliers_observation(track, ids[i]) def triangulate( self, track: str, reproj_threshold: float, min_ray_angle_degrees: float, iterations: int, ) -> None: """Triangulate track and add point to reconstruction.""" os, bs, ids = [], [], [] for shot_id, obs in self.tracks_handler.get_observations(track).items(): shot = self.reconstruction.shots[shot_id] os.append(self._shot_origin(shot)) b = shot.camera.pixel_bearing(np.array(obs.point)) r = self._shot_rotation_inverse(shot) bs.append(r.dot(b)) ids.append(shot_id) if len(os) >= 2: thresholds = len(os) * [reproj_threshold] valid_triangulation, X = pygeometry.triangulate_bearings_midpoint( np.asarray(os), np.asarray(bs), thresholds, np.radians(min_ray_angle_degrees), ) if valid_triangulation: X = pygeometry.point_refinement( np.array(os), np.array(bs), X, iterations ) self.tracks_handler.store_track_coordinates(track, X.tolist()) for shot_id in ids: self.tracks_handler.store_inliers_observation(track, shot_id) def triangulate_dlt( self, track: str, reproj_threshold: float, min_ray_angle_degrees: float, iterations: int, ) -> None: """Triangulate track using DLT and add point to reconstruction.""" Rts, bs, os, ids = [], [], [], [] for shot_id, obs in self.tracks_handler.get_observations(track).items(): shot = self.reconstruction.shots[shot_id] os.append(self._shot_origin(shot)) Rts.append(self._shot_Rt(shot)) b = shot.camera.pixel_bearing(np.array(obs.point)) bs.append(b) ids.append(shot_id) if len(Rts) >= 2: e, X = pygeometry.triangulate_bearings_dlt( np.asarray(Rts), np.asarray(bs), reproj_threshold, np.radians(min_ray_angle_degrees), ) if e: X = pygeometry.point_refinement( np.array(os), np.array(bs), X, iterations ) self.tracks_handler.store_track_coordinates(track, X.tolist()) for shot_id in ids: self.tracks_handler.store_inliers_observation(track, shot_id) def _shot_origin(self, shot: pymap.Shot) -> np.ndarray: if shot.id in self.origins: return self.origins[shot.id] else: o = shot.pose.get_origin() self.origins[shot.id] = o return o def _shot_rotation_inverse(self, shot: pymap.Shot) -> np.ndarray: if shot.id in self.rotation_inverses: return self.rotation_inverses[shot.id] else: r = shot.pose.get_rotation_matrix().T self.rotation_inverses[shot.id] = r return r def _shot_Rt(self, shot: pymap.Shot) -> np.ndarray: if shot.id in self.Rts: return self.Rts[shot.id] else: r = shot.pose.get_Rt() self.Rts[shot.id] = r return r def triangulate_shot_features( tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, shot_ids: Set[str], config: Dict[str, Any], ) -> None: """Reconstruct as many tracks seen in shot_id as possible.""" reproj_threshold = config["triangulation_threshold"] min_ray_angle = config["triangulation_min_ray_angle"] refinement_iterations = config["triangulation_refinement_iterations"] triangulator = TrackTriangulator( reconstruction, TrackHandlerTrackManager(tracks_manager, reconstruction) ) all_shots_ids = set(tracks_manager.get_shot_ids()) tracks_ids = { t for s in shot_ids if s in all_shots_ids for t in tracks_manager.get_shot_observations(s) } for track in tracks_ids: if track not in reconstruction.points: if config["triangulation_type"] == "ROBUST": triangulator.triangulate_robust( track, reproj_threshold, min_ray_angle, refinement_iterations ) elif config["triangulation_type"] == "FULL": triangulator.triangulate( track, reproj_threshold, min_ray_angle, refinement_iterations ) def retriangulate( tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, config: Dict[str, Any], ) -> Dict[str, Any]: """Retrianguate all points""" chrono = Chronometer() report = {} report["num_points_before"] = len(reconstruction.points) threshold = config["triangulation_threshold"] min_ray_angle = config["triangulation_min_ray_angle"] refinement_iterations = config["triangulation_refinement_iterations"] reconstruction.points = {} all_shots_ids = set(tracks_manager.get_shot_ids()) triangulator = TrackTriangulator( reconstruction, TrackHandlerTrackManager(tracks_manager, reconstruction) ) tracks = set() for image in reconstruction.shots.keys(): if image in all_shots_ids: tracks.update(tracks_manager.get_shot_observations(image).keys()) for track in tracks: if config["triangulation_type"] == "ROBUST": triangulator.triangulate_robust( track, threshold, min_ray_angle, refinement_iterations ) elif config["triangulation_type"] == "FULL": triangulator.triangulate( track, threshold, min_ray_angle, refinement_iterations ) report["num_points_after"] = len(reconstruction.points) chrono.lap("retriangulate") report["wall_time"] = chrono.total_time() return report def get_error_distribution(points: Dict[str, pymap.Landmark]) -> Tuple[float, float]: all_errors = [] for track in points.values(): all_errors += track.reprojection_errors.values() robust_mean = np.median(all_errors, axis=0) robust_std = 1.486 * np.median( np.linalg.norm(np.array(all_errors) - robust_mean, axis=1) ) return robust_mean, robust_std def get_actual_threshold( config: Dict[str, Any], points: Dict[str, pymap.Landmark] ) -> float: filter_type = config["bundle_outlier_filtering_type"] if filter_type == "FIXED": return config["bundle_outlier_fixed_threshold"] elif filter_type == "AUTO": mean, std = get_error_distribution(points) return config["bundle_outlier_auto_ratio"] * np.linalg.norm(mean + std) else: return 1.0 def remove_outliers( reconstruction: types.Reconstruction, config: Dict[str, Any], points: Optional[Dict[str, pymap.Landmark]] = None, ) -> int: """Remove points with large reprojection error. A list of point ids to be processed can be given in ``points``. """ if points is None: points = reconstruction.points threshold_sqr = get_actual_threshold(config, reconstruction.points) ** 2 outliers = [] for point_id in points: for shot_id, error in reconstruction.points[ point_id ].reprojection_errors.items(): error_sqr = error[0] ** 2 + error[1] ** 2 if error_sqr > threshold_sqr: outliers.append((point_id, shot_id)) track_ids = set() for track, shot_id in outliers: reconstruction.map.remove_observation(shot_id, track) track_ids.add(track) for track in track_ids: if track in reconstruction.points: lm = reconstruction.points[track] if lm.number_of_observations() < 2: reconstruction.map.remove_landmark(lm) logger.info("Removed outliers: {}".format(len(outliers))) return len(outliers) def shot_lla_and_compass( shot: pymap.Shot, reference: types.TopocentricConverter ) -> Tuple[float, float, float, float]: """Lat, lon, alt and compass of the reconstructed shot position.""" topo = shot.pose.get_origin() lat, lon, alt = reference.to_lla(*topo) dz = shot.pose.get_R_cam_to_world()[:, 2] angle = np.rad2deg(np.arctan2(dz[0], dz[1])) angle = (angle + 360) % 360 return lat, lon, alt, angle def align_two_reconstruction( r1: types.Reconstruction, r2: types.Reconstruction, common_tracks: List[Tuple[str, str]], threshold: float, ) -> Tuple[bool, Optional[np.ndarray], List[int]]: """Estimate similarity transform T between two, reconstructions r1 and r2 such as r2 = T . r1 """ t1, t2 = r1.points, r2.points if len(common_tracks) > 6: p1 = np.array([t1[t[0]].coordinates for t in common_tracks]) p2 = np.array([t2[t[1]].coordinates for t in common_tracks]) # 3 samples / 100 trials / 50% outliers = 0.99 probability # with probability = 1-(1-(1-outlier)^model)^trial T, inliers = multiview.fit_similarity_transform( p1, p2, max_iterations=100, threshold=threshold ) if len(inliers) > 0: return True, T, list(inliers) return False, None, [] def merge_two_reconstructions( r1: types.Reconstruction, r2: types.Reconstruction, config: Dict[str, Any], threshold: float = 1, ) -> List[types.Reconstruction]: """Merge two reconstructions with common tracks IDs.""" common_tracks = list(set(r1.points) & set(r2.points)) worked, T, inliers = align_two_reconstruction(r1, r2, common_tracks, threshold) if T and worked and len(inliers) >= 10: s, A, b = multiview.decompose_similarity_transform(T) r1p = r1 apply_similarity(r1p, s, A, b) r = r2 r.shots.update(r1p.shots) r.points.update(r1p.points) align_reconstruction(r, [], config) return [r] else: return [r1, r2] def merge_reconstructions( reconstructions: List[types.Reconstruction], config: Dict[str, Any] ) -> List[types.Reconstruction]: """Greedily merge reconstructions with common tracks.""" num_reconstruction = len(reconstructions) ids_reconstructions = np.arange(num_reconstruction) remaining_reconstruction = ids_reconstructions reconstructions_merged = [] num_merge = 0 for (i, j) in combinations(ids_reconstructions, 2): if (i in remaining_reconstruction) and (j in remaining_reconstruction): r = merge_two_reconstructions( reconstructions[i], reconstructions[j], config ) if len(r) == 1: remaining_reconstruction = list(set(remaining_reconstruction) - {i, j}) for k in remaining_reconstruction: rr = merge_two_reconstructions(r[0], reconstructions[k], config) if len(r) == 2: break else: r = rr remaining_reconstruction = list( set(remaining_reconstruction) - {k} ) reconstructions_merged.append(r[0]) num_merge += 1 for k in remaining_reconstruction: reconstructions_merged.append(reconstructions[k]) logger.info("Merged {0} reconstructions".format(num_merge)) return reconstructions_merged def paint_reconstruction( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, ) -> None: """Set the color of the points from the color of the tracks.""" for k, point in reconstruction.points.items(): point.color = list( map( float, next( iter(tracks_manager.get_track_observations(str(k)).values()) ).color, ) ) class ShouldBundle: """Helper to keep track of when to run bundle.""" interval: int new_points_ratio: float reconstruction: types.Reconstruction def __init__(self, data: DataSetBase, reconstruction: types.Reconstruction) -> None: self.interval = data.config["bundle_interval"] self.new_points_ratio = data.config["bundle_new_points_ratio"] self.reconstruction = reconstruction self.done() def should(self) -> bool: max_points = self.num_points_last * self.new_points_ratio max_shots = self.num_shots_last + self.interval return ( len(self.reconstruction.points) >= max_points or len(self.reconstruction.shots) >= max_shots ) def done(self) -> None: self.num_points_last = len(self.reconstruction.points) self.num_shots_last = len(self.reconstruction.shots) class ShouldRetriangulate: """Helper to keep track of when to re-triangulate.""" active: bool ratio: float reconstruction: types.Reconstruction def __init__(self, data, reconstruction) -> None: self.active = data.config["retriangulation"] self.ratio = data.config["retriangulation_ratio"] self.reconstruction = reconstruction self.done() def should(self) -> bool: max_points = self.num_points_last * self.ratio return self.active and len(self.reconstruction.points) > max_points def done(self) -> None: self.num_points_last = len(self.reconstruction.points) def grow_reconstruction( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstruction: types.Reconstruction, images: Set[str], gcp: List[pymap.GroundControlPoint], ) -> Tuple[types.Reconstruction, Dict[str, Any]]: """Incrementally add shots to an initial reconstruction.""" config = data.config report = {"steps": []} camera_priors = data.load_camera_models() rig_camera_priors = data.load_rig_cameras() paint_reconstruction(data, tracks_manager, reconstruction) align_reconstruction(reconstruction, gcp, config) bundle(reconstruction, camera_priors, rig_camera_priors, None, config) remove_outliers(reconstruction, config) paint_reconstruction(data, tracks_manager, reconstruction) should_bundle = ShouldBundle(data, reconstruction) should_retriangulate = ShouldRetriangulate(data, reconstruction) while True: if config["save_partial_reconstructions"]: paint_reconstruction(data, tracks_manager, reconstruction) data.save_reconstruction( [reconstruction], "reconstruction.{}.json".format( datetime.datetime.now().isoformat().replace(":", "_") ), ) candidates = reconstructed_points_for_images( tracks_manager, reconstruction, images ) if not candidates: break logger.info("-------------------------------------------------------") threshold = data.config["resection_threshold"] min_inliers = data.config["resection_min_inliers"] for image, _ in candidates: ok, new_shots, resrep = resect( data, tracks_manager, reconstruction, image, threshold, min_inliers, ) if not ok: continue images -= new_shots bundle_shot_poses( reconstruction, new_shots, camera_priors, rig_camera_priors, data.config, ) logger.info(f"Adding {' and '.join(new_shots)} to the reconstruction") step = { "images": list(new_shots), "resection": resrep, "memory_usage": current_memory_usage(), } report["steps"].append(step) np_before = len(reconstruction.points) triangulate_shot_features(tracks_manager, reconstruction, new_shots, config) np_after = len(reconstruction.points) step["triangulated_points"] = np_after - np_before if should_retriangulate.should(): logger.info("Re-triangulating") align_reconstruction(reconstruction, gcp, config) b1rep = bundle( reconstruction, camera_priors, rig_camera_priors, None, config ) rrep = retriangulate(tracks_manager, reconstruction, config) b2rep = bundle( reconstruction, camera_priors, rig_camera_priors, None, config ) remove_outliers(reconstruction, config) step["bundle"] = b1rep step["retriangulation"] = rrep step["bundle_after_retriangulation"] = b2rep should_retriangulate.done() should_bundle.done() elif should_bundle.should(): align_reconstruction(reconstruction, gcp, config) brep = bundle( reconstruction, camera_priors, rig_camera_priors, None, config ) remove_outliers(reconstruction, config) step["bundle"] = brep should_bundle.done() elif config["local_bundle_radius"] > 0: bundled_points, brep = bundle_local( reconstruction, camera_priors, rig_camera_priors, None, image, config, ) remove_outliers(reconstruction, config, bundled_points) step["local_bundle"] = brep break else: logger.info("Some images can not be added") break logger.info("-------------------------------------------------------") align_result = align_reconstruction(reconstruction, gcp, config, bias_override=True) if not align_result and config["bundle_compensate_gps_bias"]: overidden_config = config.copy() overidden_config["bundle_compensate_gps_bias"] = False config = overidden_config bundle(reconstruction, camera_priors, rig_camera_priors, gcp, config) remove_outliers(reconstruction, config) paint_reconstruction(data, tracks_manager, reconstruction) return reconstruction, report def triangulation_reconstruction( data: DataSetBase, tracks_manager: pymap.TracksManager ) -> Tuple[Dict[str, Any], List[types.Reconstruction]]: """Run the triangulation reconstruction pipeline.""" logger.info("Starting triangulation reconstruction") report = {} chrono = Chronometer() images = tracks_manager.get_shot_ids() data.init_reference(images) camera_priors = data.load_camera_models() rig_camera_priors = data.load_rig_cameras() gcp = data.load_ground_control_points() reconstruction = helpers.reconstruction_from_metadata(data, images) config = data.config config_override = config.copy() config_override["triangulation_type"] = "ROBUST" config_override["bundle_max_iterations"] = 10 report["steps"] = [] outer_iterations = 3 inner_iterations = 5 for i in range(outer_iterations): rrep = retriangulate(tracks_manager, reconstruction, config_override) triangulated_points = rrep["num_points_after"] logger.info( f"Triangulation SfM. Outer iteration {i}, triangulated {triangulated_points} points." ) for j in range(inner_iterations): if config_override["save_partial_reconstructions"]: paint_reconstruction(data, tracks_manager, reconstruction) data.save_reconstruction( [reconstruction], f"reconstruction.{i*inner_iterations+j}.json" ) step = {} logger.info(f"Triangulation SfM. Inner iteration {j}, running bundle ...") align_reconstruction(reconstruction, gcp, config_override) b1rep = bundle( reconstruction, camera_priors, rig_camera_priors, None, config_override ) remove_outliers(reconstruction, config_override) step["bundle"] = b1rep step["retriangulation"] = rrep report["steps"].append(step) logger.info("Triangulation SfM done.") logger.info("-------------------------------------------------------") chrono.lap("compute_reconstructions") report["wall_times"] = dict(chrono.lap_times()) align_result = align_reconstruction(reconstruction, gcp, config, bias_override=True) if not align_result and config["bundle_compensate_gps_bias"]: overidden_bias_config = config.copy() overidden_bias_config["bundle_compensate_gps_bias"] = False config = overidden_bias_config bundle(reconstruction, camera_priors, rig_camera_priors, gcp, config) remove_outliers(reconstruction, config_override) paint_reconstruction(data, tracks_manager, reconstruction) return report, [reconstruction] def incremental_reconstruction( data: DataSetBase, tracks_manager: pymap.TracksManager ) -> Tuple[Dict[str, Any], List[types.Reconstruction]]: """Run the entire incremental reconstruction pipeline.""" logger.info("Starting incremental reconstruction") report = {} chrono = Chronometer() images = tracks_manager.get_shot_ids() data.init_reference(images) remaining_images = set(images) gcp = data.load_ground_control_points() common_tracks = tracking.all_common_tracks_with_features(tracks_manager) reconstructions = [] pairs = compute_image_pairs(common_tracks, data) chrono.lap("compute_image_pairs") report["num_candidate_image_pairs"] = len(pairs) report["reconstructions"] = [] for im1, im2 in pairs: if im1 in remaining_images and im2 in remaining_images: rec_report = {} report["reconstructions"].append(rec_report) _, p1, p2 = common_tracks[im1, im2] reconstruction, rec_report["bootstrap"] = bootstrap_reconstruction( data, tracks_manager, im1, im2, p1, p2 ) if reconstruction: remaining_images -= set(reconstruction.shots) reconstruction, rec_report["grow"] = grow_reconstruction( data, tracks_manager, reconstruction, remaining_images, gcp, ) reconstructions.append(reconstruction) reconstructions = sorted(reconstructions, key=lambda x: -len(x.shots)) for k, r in enumerate(reconstructions): logger.info( "Reconstruction {}: {} images, {} points".format( k, len(r.shots), len(r.points) ) ) logger.info("{} partial reconstructions in total.".format(len(reconstructions))) chrono.lap("compute_reconstructions") report["wall_times"] = dict(chrono.lap_times()) report["not_reconstructed_images"] = list(remaining_images) return report, reconstructions def reconstruct_from_prior( data: DataSetBase, tracks_manager: pymap.TracksManager, rec_prior: types.Reconstruction, ) -> Tuple[Dict[str, Any], types.Reconstruction]: """Retriangulate a new reconstruction from the rec_prior""" reconstruction = types.Reconstruction() reconstruction.reference = rec_prior.reference report = {} rec_report = {} report["retriangulate"] = [rec_report] images = tracks_manager.get_shot_ids() # copy prior poses, cameras reconstruction.cameras = rec_prior.cameras for shot in rec_prior.shots.values(): reconstruction.add_shot(shot) prior_images = set(rec_prior.shots) remaining_images = set(images) - prior_images rec_report["num_prior_images"] = len(prior_images) rec_report["num_remaining_images"] = len(remaining_images) # Start with the known poses triangulate_shot_features(tracks_manager, reconstruction, prior_images, data.config) paint_reconstruction(data, tracks_manager, reconstruction) report["not_reconstructed_images"] = list(remaining_images) return report, reconstruction class Chronometer: def __init__(self) -> None: self.start() def start(self) -> None: t = timer() lap = ("start", 0, t) self.laps = [lap] self.laps_dict = {"start": lap} def lap(self, key: str) -> None: t = timer() dt = t - self.laps[-1][2] lap = (key, dt, t) self.laps.append(lap) self.laps_dict[key] = lap def lap_time(self, key: str) -> float: return self.laps_dict[key][1] def lap_times(self) -> List[Tuple[str, float]]: return [(k, dt) for k, dt, t in self.laps[1:]] def total_time(self) -> float: return self.laps[-1][2] - self.laps[0][2]