opensfm/synthetic_data/synthetic_generator.py (395 lines of code) (raw):

import logging import math import time from collections import defaultdict from typing import Callable, Tuple, List, Dict, Any, Optional, Union import cv2 import numpy as np import opensfm.synthetic_data.synthetic_dataset as sd import scipy.signal as signal import scipy.spatial as spatial from opensfm import ( geo, pygeometry, reconstruction as rc, types, pymap, features as oft, geometry, ) logger = logging.getLogger(__name__) def derivative(func: Callable, x: np.ndarray) -> np.ndarray: eps = 1e-10 d = (func(x + eps) - func(x)) / eps d /= np.linalg.norm(d) return d def samples_generator_random_count(count: int) -> np.ndarray: return np.random.rand(count) def samples_generator_interval( length: float, end: float, interval: float, interval_noise: float ) -> np.ndarray: samples = np.linspace(0, end / length, num=int(end / interval)) samples += np.random.normal( 0.0, float(interval_noise) / float(length), samples.shape ) return samples def generate_samples_and_local_frame( samples: np.ndarray, shape: Callable ) -> Tuple[np.ndarray, np.ndarray]: points = [] tangents = [] for i in samples: point = shape(i) points += [point] ex = derivative(shape, i) ez = np.array([ex[1], -ex[0]]) tangents += [np.array([ez, ex])] return np.array(points), np.array(tangents) def generate_samples_shifted( samples: np.ndarray, shape: Callable, shift: float ) -> np.ndarray: plane_points = [] for i in samples: point = shape(i) tangent = derivative(shape, i) tangent = np.array([-tangent[1], tangent[0]]) point += tangent * (shift / 2) plane_points += [point] return np.array(plane_points) def generate_z_plane( samples: np.ndarray, shape: Callable, thickness: float ) -> np.ndarray: plane_points = [] for i in samples: point = shape(i) tangent = derivative(shape, i) tangent = np.array([-tangent[1], tangent[0]]) shift = tangent * ((np.random.rand() - 0.5) * thickness) point += shift plane_points += [point] plane_points = np.array(plane_points) return np.insert(plane_points, 2, values=0, axis=1) def generate_xy_planes( samples: np.ndarray, shape: Callable, z_size: float, y_size: float ) -> np.ndarray: xy1 = generate_samples_shifted(samples, shape, y_size) xy2 = generate_samples_shifted(samples, shape, -y_size) xy1 = np.insert(xy1, 2, values=np.random.rand(xy1.shape[0]) * z_size, axis=1) xy2 = np.insert(xy2, 2, values=np.random.rand(xy2.shape[0]) * z_size, axis=1) return np.concatenate((xy1, xy2), axis=0) def generate_street( samples: np.ndarray, shape: Callable, height: float, width: float ) -> Tuple[np.ndarray, np.ndarray]: walls = generate_xy_planes(samples, shape, height, width) floor = generate_z_plane(samples, shape, width) return walls, floor def generate_cameras( samples: np.ndarray, shape: Callable, height: float ) -> Tuple[np.ndarray, np.ndarray]: positions, rotations = generate_samples_and_local_frame(samples, shape) positions = np.insert(positions, 2, values=height, axis=1) rotations = np.insert(rotations, 2, values=0, axis=2) rotations = np.insert(rotations, 1, values=np.array([0, 0, -1]), axis=1) return positions, rotations def line_generator( length: float, center_x: float, center_y: float, transpose: bool, point: np.ndarray ) -> np.ndarray: x = point * length if transpose: return np.transpose( np.array( [ center_y, x + center_x, ] ) ) else: return np.transpose(np.array([x + center_x, center_y])) def ellipse_generator(x_size: float, y_size: float, point: float) -> np.ndarray: y = np.sin(point * 2 * np.pi) * y_size / 2 x = np.cos(point * 2 * np.pi) * x_size / 2 return np.transpose(np.array([x, y])) def perturb_points(points: np.ndarray, sigmas: List[float]) -> None: eps = 1e-10 gaussian = np.array([max(s, eps) for s in sigmas]) for point in points: point += np.random.normal(0.0, gaussian, point.shape) def generate_causal_noise( dimensions: int, sigma: float, n: int, scale: float ) -> List[np.ndarray]: dims = [np.arange(-scale, scale) for _ in range(dimensions)] mesh = np.meshgrid(*dims) dist = np.linalg.norm(mesh, axis=0) filter_kernel = np.exp(-(dist ** 2) / (2 * scale)) noise = np.random.randn(dimensions, n) * sigma return signal.fftconvolve(noise, filter_kernel, mode="same") def generate_exifs( reconstruction: types.Reconstruction, reference: geo.TopocentricConverter, gps_noise: Union[Dict[str, float], float], imu_noise: float, causal_gps_noise: bool = False, ) -> Dict[str, Any]: """Generate fake exif metadata from the reconstruction.""" speed_ms = 10.0 previous_pose = None previous_time = 0 exifs = {} def _gps_dop(shot): gps_dop = 15 if isinstance(gps_noise, float): gps_dop = gps_noise if isinstance(gps_noise, dict): gps_dop = gps_noise[shot.camera.id] return gps_dop per_sequence = defaultdict(list) for shot_name in sorted(reconstruction.shots.keys()): shot = reconstruction.shots[shot_name] exif = {} exif["width"] = shot.camera.width exif["height"] = shot.camera.height exif["camera"] = str(shot.camera.id) exif["make"] = str(shot.camera.id) exif["skey"] = shot.metadata.sequence_key.value per_sequence[exif["skey"]].append(shot_name) if shot.camera.projection_type in ["perspective", "fisheye"]: exif["focal_ratio"] = shot.camera.focal pose = shot.pose.get_origin() if previous_pose is not None: previous_time += np.linalg.norm(pose - previous_pose) * speed_ms previous_pose = pose exif["capture_time"] = previous_time exifs[shot_name] = exif for sequence_images in per_sequence.values(): if causal_gps_noise: sequence_gps_dop = _gps_dop(reconstruction.shots[sequence_images[0]]) perturbations_2d = generate_causal_noise( 2, sequence_gps_dop, len(sequence_images), 2.0 ) for i, shot_name in enumerate(sequence_images): shot = reconstruction.shots[shot_name] exif = exifs[shot_name] origin = shot.pose.get_origin() if causal_gps_noise: gps_perturbation = [perturbations_2d[j][i] for j in range(2)] + [0] else: gps_noise = _gps_dop(shot) gps_perturbation = [gps_noise, gps_noise, 0] origin = np.array([origin]) perturb_points(origin, gps_perturbation) origin = origin[0] _, _, _, comp = rc.shot_lla_and_compass(shot, reference) lat, lon, alt = reference.to_lla(*origin) exif["gps"] = {} exif["gps"]["latitude"] = lat exif["gps"]["longitude"] = lon exif["gps"]["altitude"] = alt exif["gps"]["dop"] = _gps_dop(shot) omega, phi, kappa = geometry.opk_from_rotation( shot.pose.get_rotation_matrix() ) opk_noise = np.random.normal(0.0, np.full((3), imu_noise), (3)) exif["opk"] = {} exif["opk"]["omega"] = math.degrees(omega) + opk_noise[0] exif["opk"]["phi"] = math.degrees(phi) + opk_noise[1] exif["opk"]["kappa"] = math.degrees(kappa) + opk_noise[2] exif["compass"] = {"angle": comp} return exifs def perturb_rotations(rotations: np.ndarray, angle_sigma: float) -> None: for i in range(len(rotations)): rotation = rotations[i] rodrigues = cv2.Rodrigues(rotation)[0].ravel() angle = np.linalg.norm(rodrigues) angle_pertubed = angle + np.random.normal(0.0, angle_sigma) rodrigues *= float(angle_pertubed) / float(angle) rotations[i] = cv2.Rodrigues(rodrigues)[0] def add_points_to_reconstruction( points: np.ndarray, color: np.ndarray, reconstruction: types.Reconstruction ): shift = len(reconstruction.points) for i in range(points.shape[0]): point = reconstruction.create_point(str(shift + i), points[i, :]) point.color = color def add_shots_to_reconstruction( shots: List[List[str]], positions: List[np.ndarray], rotations: List[np.ndarray], rig_cameras: List[pymap.RigCamera], cameras: List[pygeometry.Camera], reconstruction: types.Reconstruction, sequence_key: str, ): for camera in cameras: reconstruction.add_camera(camera) rec_rig_cameras = [] for rig_camera in rig_cameras: rec_rig_cameras.append(reconstruction.add_rig_camera(rig_camera)) for i_shots, position, rotation in zip(shots, positions, rotations): instance_id = "_".join([s[0] for s in i_shots]) rig_instance = reconstruction.add_rig_instance(pymap.RigInstance(instance_id)) rig_instance.pose = pygeometry.Pose(rotation, -rotation.dot(position)) for shot, camera in zip(i_shots, cameras): shot_id = shot[0] rig_camera_id = shot[1] shot = reconstruction.create_shot( shot_id, camera.id, pose=None, rig_camera_id=rig_camera_id, rig_instance_id=instance_id, ) shot.metadata.sequence_key.value = sequence_key def create_reconstruction( points: List[np.ndarray], colors: List[np.ndarray], cameras: List[List[pygeometry.Camera]], shot_ids: List[List[str]], rig_shots: List[List[List[Tuple[str, str]]]], rig_positions: List[np.ndarray], rig_rotations: List[np.ndarray], rig_cameras: List[List[pymap.RigCamera]], reference: Optional[geo.TopocentricConverter], ): reconstruction = types.Reconstruction() if reference is not None: reconstruction.reference = reference for point, color in zip(points, colors): add_points_to_reconstruction(point, color, reconstruction) for i, ( s_rig_shots, s_rig_positions, s_rig_rotations, s_rig_cameras, s_cameras, ) in enumerate(zip(rig_shots, rig_positions, rig_rotations, rig_cameras, cameras)): add_shots_to_reconstruction( s_rig_shots, s_rig_positions, s_rig_rotations, s_rig_cameras, s_cameras, reconstruction, str(f"sequence_{i}"), ) return reconstruction def generate_track_data( reconstruction: types.Reconstruction, maximum_depth: float, projection_noise: float, gcp_noise: Tuple[float, float], gcps_count: Optional[int], gcp_shift: Optional[np.ndarray], on_disk_features_filename: Optional[str], ) -> Tuple[ sd.SyntheticFeatures, pymap.TracksManager, Dict[str, pymap.GroundControlPoint] ]: """Generate projection data from a reconstruction, considering a maximum viewing depth and gaussian noise added to the ideal projections. Returns feature/descriptor/color data per shot and a tracks manager object. """ tracks_manager = pymap.TracksManager() feature_data_type = np.float32 desc_size = 128 non_zeroes = 5 points_ids = list(reconstruction.points) points_coordinates = [p.coordinates for p in reconstruction.points.values()] points_colors = [p.color for p in reconstruction.points.values()] # generate random descriptors per point track_descriptors = [] for _ in points_coordinates: descriptor = np.zeros(desc_size) for _ in range(non_zeroes): index = np.random.randint(0, desc_size) descriptor[index] = np.random.random() * 255 track_descriptors.append(descriptor.round().astype(feature_data_type)) # should speed-up projection queries points_tree = spatial.cKDTree(points_coordinates) start = time.time() features = sd.SyntheticFeatures(on_disk_features_filename) default_scale = 0.004 for index, (shot_index, shot) in enumerate(reconstruction.shots.items()): # query all closest points neighbors = list( sorted(points_tree.query_ball_point(shot.pose.get_origin(), maximum_depth)) ) # project them projections = shot.project_many( np.array([points_coordinates[c] for c in neighbors]) ) # shot constants center = shot.pose.get_origin() z_axis = shot.pose.get_rotation_matrix()[2] is_panorama = pygeometry.Camera.is_panorama(shot.camera.projection_type) perturbation = float(projection_noise) / float( max(shot.camera.width, shot.camera.height) ) sigmas = np.array([perturbation, perturbation]) # pre-generate random perturbations perturbations = np.random.normal(0.0, sigmas, (len(projections), 2)) # run and check valid projections projections_inside = [] descriptors_inside = [] colors_inside = [] for i, (p_id, projection) in enumerate(zip(neighbors, projections)): if not _is_inside_camera(projection, shot.camera): continue point = points_coordinates[p_id] if not is_panorama and not _is_in_front(point, center, z_axis): continue # add perturbation projection += perturbations[i] # push data color = points_colors[p_id] original_id = points_ids[p_id] projections_inside.append([projection[0], projection[1], default_scale]) descriptors_inside.append(track_descriptors[p_id]) colors_inside.append(color) obs = pymap.Observation( projection[0], projection[1], default_scale, color[0], color[1], color[2], len(projections_inside) - 1, ) tracks_manager.add_observation(str(shot_index), str(original_id), obs) features[shot_index] = oft.FeaturesData( np.array(projections_inside), np.array(descriptors_inside), np.array(colors_inside), None, ) if index % 100 == 0: logger.info( f"Flushing images # {index} ({(time.time() - start)/(index+1)} sec. per image" ) features.sync() gcps = {} if gcps_count is not None and gcp_shift is not None: all_track_ids = list(tracks_manager.get_track_ids()) gcps_ids = [ all_track_ids[i] for i in np.random.randint(len(all_track_ids) - 1, size=gcps_count) ] sigmas_gcp = np.random.normal( 0.0, np.array([gcp_noise[0], gcp_noise[0], gcp_noise[1]]), (len(gcps_ids), 3), ) for i, gcp_id in enumerate(gcps_ids): point = reconstruction.points[gcp_id] gcp = pymap.GroundControlPoint() gcp.id = f"gcp-{gcp_id}" enu = point.coordinates + gcp_shift + sigmas_gcp[i] lat, lon, alt = reconstruction.reference.to_lla(*enu) gcp.lla = {"latitude": lat, "longitude": lon, "altitude": alt} gcp.has_altitude = True for shot_id, obs in tracks_manager.get_track_observations(gcp_id).items(): o = pymap.GroundControlPointObservation() o.shot_id = shot_id o.projection = obs.point gcp.add_observation(o) gcps[gcp.id] = gcp return features, tracks_manager, gcps def _is_in_front(point: np.ndarray, center: np.ndarray, z_axis: np.ndarray) -> bool: return ( (point[0] - center[0]) * z_axis[0] + (point[1] - center[1]) * z_axis[1] + (point[2] - center[2]) * z_axis[2] ) > 0 def _is_inside_camera(projection: np.ndarray, camera: pygeometry.Camera) -> bool: w, h = float(camera.width), float(camera.height) w2 = float(2 * camera.width) h2 = float(2 * camera.height) if w > h: return (-0.5 < projection[0] < 0.5) and (-h / w2 < projection[1] < h / w2) else: return (-0.5 < projection[1] < 0.5) and (-w / h2 < projection[0] < w / h2)