opensfm/undistort.py (338 lines of code) (raw):

import itertools import logging from typing import Iterator, List, Dict, Optional import cv2 import numpy as np from opensfm import ( features, log, pygeometry, pymap, transformations as tf, types, features_processing, ) from opensfm.context import parallel_map from opensfm.dataset import UndistortedDataSet from opensfm.dataset_base import DataSetBase logger = logging.getLogger(__name__) def undistort_reconstruction( tracks_manager: Optional[pymap.TracksManager], reconstruction: types.Reconstruction, data: DataSetBase, udata: UndistortedDataSet, ) -> Dict[pymap.Shot, List[pymap.Shot]]: all_images = set(data.images()) image_format = data.config["undistorted_image_format"] urec = types.Reconstruction() urec.points = reconstruction.points urec.reference = reconstruction.reference rig_instance_count = itertools.count() utracks_manager = pymap.TracksManager() logger.debug("Undistorting the reconstruction") undistorted_shots = {} for shot in reconstruction.shots.values(): if shot.id not in all_images: logger.warning( f"Not undistorting {shot.id} as it is missing from the dataset's input images." ) continue if shot.camera.projection_type == "perspective": urec.add_camera(perspective_camera_from_perspective(shot.camera)) subshots = [get_shot_with_different_camera(urec, shot, image_format)] elif shot.camera.projection_type == "brown": urec.add_camera(perspective_camera_from_brown(shot.camera)) subshots = [get_shot_with_different_camera(urec, shot, image_format)] elif shot.camera.projection_type in ["fisheye", "fisheye_opencv"]: urec.add_camera(perspective_camera_from_fisheye(shot.camera)) subshots = [get_shot_with_different_camera(urec, shot, image_format)] elif pygeometry.Camera.is_panorama(shot.camera.projection_type): subshot_width = int(data.config["depthmap_resolution"]) subshots = perspective_views_of_a_panorama( shot, subshot_width, urec, image_format, rig_instance_count ) else: logger.warning(f"Not undistorting {shot.id} with unknown camera type.") continue for subshot in subshots: if tracks_manager: add_subshot_tracks(tracks_manager, utracks_manager, shot, subshot) undistorted_shots[shot.id] = subshots udata.save_undistorted_reconstruction([urec]) if tracks_manager: udata.save_undistorted_tracks_manager(utracks_manager) udata.save_undistorted_shot_ids( { shot_id: [ushot.id for ushot in ushots] for shot_id, ushots in undistorted_shots.items() } ) return undistorted_shots def undistort_reconstruction_with_images( tracks_manager: Optional[pymap.TracksManager], reconstruction: types.Reconstruction, data: DataSetBase, udata: UndistortedDataSet, skip_images: bool = False, ) -> Dict[pymap.Shot, List[pymap.Shot]]: undistorted_shots = undistort_reconstruction( tracks_manager, reconstruction, data, udata ) if not skip_images: arguments = [] for shot_id, subshots in undistorted_shots.items(): arguments.append((reconstruction.shots[shot_id], subshots, data, udata)) processes = data.config["processes"] # trim processes to available memory, otherwise, pray mem_available = log.memory_available() if mem_available: # Use 90% of available memory ratio_use = 0.9 mem_available *= ratio_use processing_size = data.config["depthmap_resolution"] output_size = processing_size * processing_size * 4 / 1024 / 1024 undistort_factor = 3 # 1 for original image, 2 for (U,V) remapping input_size = features_processing.average_image_size(data) * undistort_factor processing_size = output_size + input_size processes = min(max(1, int(mem_available / processing_size)), processes) logger.info( f"Undistorting in parallel with {processes} processes ({processing_size} MB per image)" ) parallel_map(undistort_image_and_masks, arguments, processes) return undistorted_shots def undistort_image_and_masks(arguments) -> None: shot, undistorted_shots, data, udata = arguments log.setup() logger.debug("Undistorting image {}".format(shot.id)) max_size = data.config["undistorted_image_max_size"] # Undistort image image = data.load_image(shot.id, unchanged=True, anydepth=True) if image is not None: undistorted = undistort_image( shot, undistorted_shots, image, cv2.INTER_AREA, max_size ) for k, v in undistorted.items(): udata.save_undistorted_image(k, v) # Undistort mask mask = data.load_mask(shot.id) if mask is not None: undistorted = undistort_image( shot, undistorted_shots, mask, cv2.INTER_NEAREST, max_size ) for k, v in undistorted.items(): udata.save_undistorted_mask(k, v) # Undistort segmentation segmentation = data.load_segmentation(shot.id) if segmentation is not None: undistorted = undistort_image( shot, undistorted_shots, segmentation, cv2.INTER_NEAREST, max_size ) for k, v in undistorted.items(): udata.save_undistorted_segmentation(k, v) def undistort_image( shot: pymap.Shot, undistorted_shots: List[pymap.Shot], original: Optional[np.ndarray], interpolation, max_size: int, ) -> Dict[str, np.ndarray]: """Undistort an image into a set of undistorted ones. Args: shot: the distorted shot undistorted_shots: the set of undistorted shots covering the distorted shot field of view. That is 1 for most camera types and 6 for spherical cameras. original: the original distorted image array. interpolation: the opencv interpolation flag to use. max_size: maximum size of the undistorted image. """ if original is None: return {} projection_type = shot.camera.projection_type if projection_type in ["perspective", "brown", "fisheye", "fisheye_opencv"]: [undistorted_shot] = undistorted_shots new_camera = undistorted_shot.camera height, width = original.shape[:2] map1, map2 = pygeometry.compute_camera_mapping( shot.camera, new_camera, width, height ) undistorted = cv2.remap(original, map1, map2, interpolation) return {undistorted_shot.id: scale_image(undistorted, max_size)} elif pygeometry.Camera.is_panorama(projection_type): subshot_width = undistorted_shots[0].camera.width width = 4 * subshot_width height = width // 2 image = cv2.resize(original, (width, height), interpolation=interpolation) mint = cv2.INTER_LINEAR if interpolation == cv2.INTER_AREA else interpolation res = {} for undistorted_shot in undistorted_shots: undistorted = render_perspective_view_of_a_panorama( image, shot, undistorted_shot, mint ) res[undistorted_shot.id] = scale_image(undistorted, max_size) return res else: raise NotImplementedError( "Undistort not implemented for projection type: {}".format( shot.camera.projection_type ) ) def scale_image(image: np.ndarray, max_size: int) -> np.ndarray: """Scale an image not to exceed max_size.""" height, width = image.shape[:2] factor = max_size / float(max(height, width)) if factor >= 1: return image width = int(round(width * factor)) height = int(round(height * factor)) return cv2.resize(image, (width, height), interpolation=cv2.INTER_NEAREST) def add_image_format_extension(shot_id: str, image_format: str) -> str: if shot_id.endswith(f".{image_format}"): return shot_id else: return f"{shot_id}.{image_format}" def get_shot_with_different_camera( urec: types.Reconstruction, shot: pymap.Shot, image_format: str, ) -> pymap.Shot: new_shot_id = add_image_format_extension(shot.id, image_format) new_shot = urec.create_shot(new_shot_id, shot.camera.id, shot.pose) new_shot.metadata = shot.metadata return new_shot def perspective_camera_from_perspective( distorted: pygeometry.Camera, ) -> pygeometry.Camera: """Create an undistorted camera from a distorted.""" camera = pygeometry.Camera.create_perspective(distorted.focal, 0.0, 0.0) camera.id = distorted.id camera.width = distorted.width camera.height = distorted.height return camera def perspective_camera_from_brown(brown: pygeometry.Camera) -> pygeometry.Camera: """Create a perspective camera from a Brown camera.""" camera = pygeometry.Camera.create_perspective( brown.focal * (1 + brown.aspect_ratio) / 2.0, 0.0, 0.0 ) camera.id = brown.id camera.width = brown.width camera.height = brown.height return camera def perspective_camera_from_fisheye(fisheye: pygeometry.Camera) -> pygeometry.Camera: """Create a perspective camera from a fisheye.""" camera = pygeometry.Camera.create_perspective(fisheye.focal, 0.0, 0.0) camera.id = fisheye.id camera.width = fisheye.width camera.height = fisheye.height return camera def perspective_camera_from_fisheye_opencv( fisheye_opencv: pygeometry.Camera, ) -> pygeometry.Camera: """Create a perspective camera from a fisheye extended.""" camera = pygeometry.Camera.create_perspective( fisheye_opencv.focal * (1 + fisheye_opencv.aspect_ratio) / 2.0, 0.0, 0.0 ) camera.id = fisheye_opencv.id camera.width = fisheye_opencv.width camera.height = fisheye_opencv.height return camera def perspective_camera_from_fisheye62( fisheye62: pygeometry.Camera, ) -> pygeometry.Camera: """Create a perspective camera from a fisheye extended.""" camera = pygeometry.Camera.create_perspective( fisheye62.focal * (1 + fisheye62.aspect_ratio) / 2.0, 0.0, 0.0 ) camera.id = fisheye62.id camera.width = fisheye62.width camera.height = fisheye62.height return camera def perspective_views_of_a_panorama( spherical_shot: pymap.Shot, width: int, reconstruction: types.Reconstruction, image_format: str, rig_instance_count: Iterator[int], ) -> List[pymap.Shot]: """Create 6 perspective views of a panorama.""" camera = pygeometry.Camera.create_perspective(0.5, 0.0, 0.0) camera.id = "perspective_panorama_camera" camera.width = width camera.height = width reconstruction.add_camera(camera) names = ["front", "left", "back", "right", "top", "bottom"] rotations = [ tf.rotation_matrix(-0 * np.pi / 2, np.array([0, 1, 0])), tf.rotation_matrix(-1 * np.pi / 2, np.array([0, 1, 0])), tf.rotation_matrix(-2 * np.pi / 2, np.array([0, 1, 0])), tf.rotation_matrix(-3 * np.pi / 2, np.array([0, 1, 0])), tf.rotation_matrix(-np.pi / 2, np.array([1, 0, 0])), tf.rotation_matrix(+np.pi / 2, np.array([1, 0, 0])), ] rig_instance = reconstruction.add_rig_instance( pymap.RigInstance(str(next(rig_instance_count))) ) shots = [] for name, rotation in zip(names, rotations): if name not in reconstruction.rig_cameras: rig_camera_pose = pygeometry.Pose() rig_camera_pose.set_rotation_matrix(rotation[:3, :3]) rig_camera = pymap.RigCamera(rig_camera_pose, name) reconstruction.add_rig_camera(rig_camera) rig_camera = reconstruction.rig_cameras[name] shot_id = add_image_format_extension( f"{spherical_shot.id}_perspective_view_{name}", image_format ) shot = reconstruction.create_shot( shot_id, camera.id, pygeometry.Pose(), rig_camera.id, rig_instance.id ) shot.metadata = spherical_shot.metadata shots.append(shot) rig_instance.pose = spherical_shot.pose return shots def render_perspective_view_of_a_panorama( image: np.ndarray, panoshot: pymap.Shot, perspectiveshot: pymap.Shot, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_WRAP, ) -> np.ndarray: """Render a perspective view of a panorama.""" # Get destination pixel coordinates dst_shape = (perspectiveshot.camera.height, perspectiveshot.camera.width) dst_y, dst_x = np.indices(dst_shape).astype(np.float32) dst_pixels_denormalized = np.column_stack([dst_x.ravel(), dst_y.ravel()]) dst_pixels = features.normalized_image_coordinates( dst_pixels_denormalized, perspectiveshot.camera.width, perspectiveshot.camera.height, ) # Convert to bearing dst_bearings = perspectiveshot.camera.pixel_bearing_many(dst_pixels) # Rotate to panorama reference frame rotation = np.dot( panoshot.pose.get_rotation_matrix(), perspectiveshot.pose.get_rotation_matrix().T, ) rotated_bearings = np.dot(dst_bearings, rotation.T) # Project to panorama pixels src_pixels = panoshot.camera.project_many(rotated_bearings) src_pixels_denormalized = features.denormalized_image_coordinates( src_pixels, image.shape[1], image.shape[0] ) src_pixels_denormalized.shape = dst_shape + (2,) # Sample color x = src_pixels_denormalized[..., 0].astype(np.float32) y = src_pixels_denormalized[..., 1].astype(np.float32) colors = cv2.remap(image, x, y, interpolation, borderMode=borderMode) return colors def add_subshot_tracks( tracks_manager: pymap.TracksManager, utracks_manager: pymap.TracksManager, shot: pymap.Shot, subshot: pymap.Shot, ) -> None: """Add shot tracks to the undistorted tracks_manager.""" if shot.id not in tracks_manager.get_shot_ids(): return if pygeometry.Camera.is_panorama(shot.camera.projection_type): add_pano_subshot_tracks(tracks_manager, utracks_manager, shot, subshot) else: for track_id, obs in tracks_manager.get_shot_observations(shot.id).items(): utracks_manager.add_observation(subshot.id, track_id, obs) def add_pano_subshot_tracks( tracks_manager: pymap.TracksManager, utracks_manager: pymap.TracksManager, panoshot: pymap.Shot, perspectiveshot: pymap.Shot, ) -> None: """Add edges between subshots and visible tracks.""" for track_id, obs in tracks_manager.get_shot_observations(panoshot.id).items(): bearing = panoshot.camera.pixel_bearing(obs.point) rotation = np.dot( perspectiveshot.pose.get_rotation_matrix(), panoshot.pose.get_rotation_matrix().T, ) rotated_bearing = np.dot(bearing, rotation.T) if rotated_bearing[2] <= 0: continue perspective_feature = perspectiveshot.camera.project(rotated_bearing) if ( perspective_feature[0] < -0.5 or perspective_feature[0] > 0.5 or perspective_feature[1] < -0.5 or perspective_feature[1] > 0.5 ): continue obs.point = perspective_feature utracks_manager.add_observation(perspectiveshot.id, track_id, obs)