opensfm/rig.py (322 lines of code) (raw):

"""Tool for handling rigs""" import logging import os import random import re from typing import Dict, Tuple, List, Optional, Set, Iterable, TYPE_CHECKING import networkx as nx import numpy as np import scipy.spatial as spatial from opensfm import reconstruction as orec, actions, pygeometry, pymap, types if TYPE_CHECKING: from opensfm.dataset import DataSet logger = logging.getLogger(__name__) TRigPatterns = Dict[str, str] TRigCameraGroup = Set[str] TRigImage = Tuple[str, str] TRigInstance = List[TRigImage] INCOMPLETE_INSTANCE_GROUP = "INCOMPLETE_INSTANCE_GROUP" INCOMPLETE_INSTANCE_ID = "INCOMPLETE_INSTANCE_ID" def default_rig_cameras(camera_ids: Iterable[str]) -> Dict[str, pymap.RigCamera]: """Return per-camera models default rig cameras (identity pose).""" default_rig_cameras = {} for camera_id in camera_ids: default_rig_cameras[camera_id] = pymap.RigCamera(pygeometry.Pose(), camera_id) return default_rig_cameras def rig_assignments_per_image( rig_assignments: Dict[str, List[Tuple[str, str]]], ) -> Dict[str, Tuple[str, str, List[str]]]: """Return rig assignments data for each image.""" assignments_per_image = {} for instance_id, instance in rig_assignments.items(): instance_shots = [s[0] for s in instance] for (shot_id, rig_camera_id) in instance: assignments_per_image[shot_id] = ( f"{instance_id}", rig_camera_id, instance_shots, ) return assignments_per_image def find_image_rig( image: str, rig_patterns: TRigPatterns ) -> Tuple[Optional[str], Optional[str]]: """Given an image and candidates rig model patterns, return the RigCamera ID/Instance Member ID this image belongs to. """ for rig_camera_id, pattern in rig_patterns.items(): instance_member_id = re.sub(pattern, "", image) if instance_member_id == "": continue if instance_member_id != image: return (rig_camera_id, instance_member_id) return None, None def create_instances_with_patterns( images: List[str], rig_patterns: TRigPatterns ) -> Tuple[Dict[str, TRigInstance], List[str]]: """Using the provided patterns, group images that should belong to the same rig instances. It will also check that a RigCamera belong to exactly one group of RigCameras Returns : A dict (by instance ID) of list of tuple of (image, rig camera) """ per_instance_id: Dict[str, TRigInstance] = {} for image in images: rig_camera_id, instance_member_id = find_image_rig(image, rig_patterns) if not rig_camera_id or not instance_member_id: instance_member_id = INCOMPLETE_INSTANCE_GROUP rig_camera_id = INCOMPLETE_INSTANCE_ID if instance_member_id not in per_instance_id: per_instance_id[instance_member_id] = [] per_instance_id[instance_member_id].append((image, rig_camera_id)) per_complete_instance_id: Dict[str, TRigInstance] = {} single_shots: List[str] = [] groups_per_camera: Dict[str, TRigCameraGroup] = {} for instance_id, cameras in per_instance_id.items(): if instance_id == INCOMPLETE_INSTANCE_GROUP: single_shots += [im for im, _ in cameras] continue cameras_group = {c for _, c in cameras} for _, c in cameras: size_new = len(cameras_group) override = c in groups_per_camera and size_new >= len(groups_per_camera[c]) is_new = c not in groups_per_camera if is_new or override: groups_per_camera[c] = cameras_group else: logger.warning( ( f"Rig camera {c} already belongs to the rig camera group {groups_per_camera[c]}." f"This rig camera is probably part of an incomplete instance : {cameras_group}" ) ) per_complete_instance_id[instance_id] = cameras return per_complete_instance_id, single_shots def group_instances( rig_instances: Dict[str, TRigInstance] ) -> Dict[str, List[TRigInstance]]: per_rig_camera_group: Dict[str, List[TRigInstance]] = {} for cameras in rig_instances.values(): cameras_group = ", ".join(sorted({c for _, c in cameras})) if cameras_group not in per_rig_camera_group: per_rig_camera_group[cameras_group] = [] per_rig_camera_group[cameras_group].append(cameras) return per_rig_camera_group def propose_subset_dataset_from_instances( data: "DataSet", rig_instances: Dict[str, TRigInstance], name: str ) -> Iterable[Tuple["DataSet", List[List[Tuple[str, str]]]]]: """Given a list of images grouped by rigs instances, infitely propose random subset of images and create a dataset subset with the provided name from them. Returns : Yield infinitely DataSet containing a subset of images containing enough rig instances """ per_rig_camera_group = group_instances(rig_instances) data.init_reference() reference = data.load_reference() instances_to_pick = {} for key, instances in per_rig_camera_group.items(): # build GPS look-up tree gpses = [] for i, instance in enumerate(instances): all_gps = [] for image, _ in instance: gps = data.load_exif(image)["gps"] all_gps.append( reference.to_topocentric(gps["latitude"], gps["longitude"], 0) ) gpses.append((i, np.average(np.array(all_gps), axis=0))) tree = spatial.cKDTree([x[1] for x in gpses]) # build NN-graph and split by connected components nn = 6 instances_graph = nx.Graph() for i, gps in gpses: distances, neighbors = tree.query(gps, k=nn) for d, n in zip(distances, neighbors): if i == n or n >= len(gpses): continue instances_graph.add_edge(i, n, weight=d) all_components = sorted( nx.algorithms.components.connected_components(instances_graph), key=len, reverse=True, ) logger.info(f"Found {len(all_components)} connected components") if len(all_components) < 1: continue # keep the biggest one biggest_component = all_components[0] logger.info(f"Best component has {len(biggest_component)} instances") instances_to_pick[key] = biggest_component random.seed(42) while True: total_instances = [] subset_images = [] for key, instances in instances_to_pick.items(): all_instances = per_rig_camera_group[key] instances_sorted = sorted( [all_instances[i] for i in instances], key=lambda x: data.load_exif(x[0][0])["capture_time"], ) subset_size = data.config["rig_calibration_subset_size"] random_index = random.randint(0, len(instances_sorted) - 1) instances_calibrate = instances_sorted[ max([0, random_index - int(subset_size / 2)]) : min( [random_index + int(subset_size / 2), len(instances_sorted) - 1] ) ] for instance in instances_calibrate: subset_images += [x[0] for x in instance] total_instances += instances_calibrate data.io_handler.rm_if_exist(os.path.join(data.data_path, name)) yield data.subset(name, subset_images), total_instances def compute_relative_pose( pose_instances: List[List[Tuple[pymap.Shot, str]]], ) -> Dict[str, pymap.RigCamera]: """Compute a rig model relatives poses given poses grouped by rig instance.""" # Put all poses instances into some canonical frame taken as the mean of their R|t centered_pose_instances = [] for instance in pose_instances: origin_center = np.zeros(3) rotation_center = np.zeros(3) for shot, _ in instance: rotation_center += shot.pose.rotation origin_center += shot.pose.get_origin() origin_center /= len(instance) rotation_center /= len(instance) centered_pose_instance = [] for shot, rig_camera_id in instance: instance_pose = pygeometry.Pose(rotation_center) instance_pose.set_origin(origin_center) instance_pose_camera = shot.pose.relative_to(instance_pose) centered_pose_instance.append( ( instance_pose_camera, rig_camera_id, shot.camera.id, ) ) centered_pose_instances.append(centered_pose_instance) # Average canonical poses per RigCamera ID average_origin, average_rotation, count_poses, camera_ids = {}, {}, {}, {} for centered_pose_instance in centered_pose_instances: for pose, rig_camera_id, camera_id in centered_pose_instance: if rig_camera_id not in average_origin: average_origin[rig_camera_id] = np.zeros(3) average_rotation[rig_camera_id] = np.zeros(3) count_poses[rig_camera_id] = 0 average_origin[rig_camera_id] += pose.get_origin() average_rotation[rig_camera_id] += pose.rotation camera_ids[rig_camera_id] = camera_id count_poses[rig_camera_id] += 1 # Construct final RigCamera results rig_cameras: Dict[str, pymap.RigCamera] = {} for rig_camera_id, count in count_poses.items(): o = average_origin[rig_camera_id] / count r = average_rotation[rig_camera_id] / count pose = pygeometry.Pose(r) pose.set_origin(o) rig_cameras[rig_camera_id] = pymap.RigCamera(pose, rig_camera_id) return rig_cameras def create_rig_cameras_from_reconstruction( reconstruction: types.Reconstruction, rig_instances: Dict[str, TRigInstance] ) -> Dict[str, pymap.RigCamera]: """Compute rig cameras poses, given a reconstruction and rig instances's shots.""" rig_cameras: Dict[str, pymap.RigCamera] = {} reconstructions_shots = set(reconstruction.shots) logger.info(f"Computing rig cameras pose using {len(reconstructions_shots)} shots") per_rig_camera_group = group_instances(rig_instances) logger.info(f"Found {len(per_rig_camera_group)} rig cameras groups") for instances in sorted(per_rig_camera_group.values(), key=lambda x: -len(x)): pose_groups = [] for instance in instances: if any( True if shot_id not in reconstructions_shots else False for shot_id, _ in instance ): continue pose_groups.append( [ (reconstruction.shots[shot_id], rig_camera_id) for shot_id, rig_camera_id in instance ] ) for rig_camera_id, rig_camera in compute_relative_pose(pose_groups).items(): if rig_camera_id in rig_cameras: logger.warning( f"Ignoring {rig_camera_id} as it was already computed from a bigger set of instances" ) else: rig_cameras[rig_camera_id] = rig_camera return rig_cameras def create_rigs_with_pattern(data: "DataSet", patterns: TRigPatterns): """Create rig data (`rig_cameras.json` and `rig_assignments.json`) by performing pattern matching to group images belonging to the same instances, followed by a bit of ad-hoc SfM to find some initial relative poses. """ # Construct instances assignments for each rig instances_per_rig, single_shots = create_instances_with_patterns( data.images(), patterns ) for rig_id, instances in instances_per_rig.items(): logger.info( f"Found {len(instances)} shots for instance {rig_id} using pattern matching." ) logger.info(f"Found {len(single_shots)} single shots using pattern matching.") # Create some random subset DataSet with enough images from each rig and run SfM count = 0 max_rounds = data.config["rig_calibration_max_rounds"] best_reconstruction = None best_rig_cameras = None for subset_data, instances in propose_subset_dataset_from_instances( data, instances_per_rig, "rig_calibration" ): if count > max_rounds: break count += 1 if len(subset_data.images()) == 0: continue # Run a bit of SfM without any rig logger.info( f"Running SfM on a subset of {len(subset_data.images())} images. Round {count}/{max_rounds}" ) actions.extract_metadata.run_dataset(subset_data) actions.detect_features.run_dataset(subset_data) actions.match_features.run_dataset(subset_data) actions.create_tracks.run_dataset(subset_data) actions.reconstruct.run_dataset( subset_data, orec.ReconstructionAlgorithm.INCREMENTAL ) reconstructions = subset_data.load_reconstruction() if len(reconstructions) == 0: logger.error("Couldn't run sucessful SfM on the subset of images.") continue reconstruction = reconstructions[0] # Compute some relative poses rig_cameras = create_rig_cameras_from_reconstruction( reconstruction, instances_per_rig ) found_cameras = {c for i in instances_per_rig.values() for _, c in i} if set(rig_cameras.keys()) != found_cameras: logger.error( f"Calibrated {len(rig_cameras)} whereas {len(found_cameras)} were requested. Rig creation failed." ) continue reconstructed_instances = count_reconstructed_instances( instances, reconstruction ) logger.info( f"reconstructed {reconstructed_instances} instances over {len(instances)}" ) if ( reconstructed_instances < len(instances) * data.config["rig_calibration_completeness"] ): logger.error( f"Not enough reconstructed instances: {reconstructed_instances} instances over {len(instances)} instances." ) continue best_reconstruction = reconstruction best_rig_cameras = rig_cameras break if best_reconstruction and best_rig_cameras: logger.info( f"Found a candidate for rig calibration with {len(best_reconstruction.shots)} shots" ) data.save_rig_cameras(best_rig_cameras) data.save_rig_assignments(instances_per_rig) else: logger.error( "Could not run any sucessful SfM on images subset for rig calibration" ) def count_reconstructed_instances( instances: List[List[Tuple[str, str]]], reconstruction: types.Reconstruction ) -> int: instances_map = {} instances_count = {} for i, instance in enumerate(instances): instances_count[i] = len(instance) for shot_id, _ in instance: instances_map[shot_id] = i for s in reconstruction.shots: instances_count[instances_map[s]] -= 1 return len(instances) - sum(1 for i in instances_count.values() if i > 0) def same_rig_shot(meta1, meta2): """True if shots taken at the same time on a rig.""" have_gps = ( "gps" in meta1 and "gps" in meta2 and "latitude" in meta1["gps"] and "latitude" in meta2["gps"] ) same_gps = ( have_gps and meta1["gps"]["latitude"] == meta2["gps"]["latitude"] and meta1["gps"]["longitude"] == meta2["gps"]["longitude"] ) same_time = meta1["capture_time"] == meta2["capture_time"] return same_gps and same_time