opensfm/stats.py (843 lines of code) (raw):

import datetime import math import os import random import statistics from collections import defaultdict from functools import lru_cache from typing import Dict, List, Tuple, Optional, Any import matplotlib as mpl import matplotlib.cm as cm import matplotlib.colors as colors import matplotlib.pyplot as plt import numpy as np from opensfm import io, multiview, feature_loader, pymap, types, pygeometry from opensfm.dataset import DataSet, DataSetBase RESIDUAL_PIXEL_CUTOFF = 4 def _norm2d(point: np.ndarray) -> float: return math.sqrt(point[0] * point[0] + point[1] * point[1]) def _length_histogram( tracks_manager: pymap.TracksManager, points: Dict[str, pymap.Landmark] ) -> Tuple[List[str], List[int]]: hist = defaultdict(int) for point in points.values(): obs_count = point.number_of_observations() if not obs_count: obs_count = len(tracks_manager.get_track_observations(point.id)) hist[obs_count] += 1 return list(hist.keys()), list(hist.values()) def _gps_errors(reconstruction: types.Reconstruction) -> List[np.ndarray]: errors = [] for shot in reconstruction.shots.values(): if shot.metadata.gps_position.has_value: bias = reconstruction.biases[shot.camera.id] gps = shot.metadata.gps_position.value unbiased_gps = bias.transform(gps) optical_center = shot.pose.get_origin() errors.append(np.array(optical_center - unbiased_gps)) return errors def _gps_gcp_errors_stats(errors: Optional[np.ndarray]) -> Dict[str, Any]: if errors is None or len(errors) == 0: return {} stats = {} squared = np.multiply(errors, errors) m_squared = np.mean(squared, 0) mean = np.mean(errors, 0) std_dev = np.std(errors, 0) average = np.average(np.linalg.norm(errors, axis=1)) stats["mean"] = {"x": mean[0], "y": mean[1], "z": mean[2]} stats["std"] = {"x": std_dev[0], "y": std_dev[1], "z": std_dev[2]} stats["error"] = { "x": math.sqrt(m_squared[0]), "y": math.sqrt(m_squared[1]), "z": math.sqrt(m_squared[2]), } stats["average_error"] = average return stats def gps_errors(reconstructions: List[types.Reconstruction]) -> Dict[str, Any]: all_errors = [] for rec in reconstructions: all_errors += _gps_errors(rec) return _gps_gcp_errors_stats(np.array(all_errors)) def gcp_errors( data: DataSetBase, reconstructions: List[types.Reconstruction] ) -> Dict[str, Any]: all_errors = [] reference = data.load_reference() gcps = data.load_ground_control_points() if not gcps: return {} all_errors = [] for gcp in gcps: if not gcp.lla: continue triangulated = None for rec in reconstructions: triangulated = multiview.triangulate_gcp(gcp, rec.shots, 1.0, 0.1) if triangulated is None: continue else: break if triangulated is None: continue gcp_enu = reference.to_topocentric(*gcp.lla_vec) all_errors.append(triangulated - gcp_enu) return _gps_gcp_errors_stats(np.array(all_errors)) def _compute_errors( reconstructions: List[types.Reconstruction], tracks_manager: pymap.TracksManager ) -> Any: @lru_cache(10) def _compute_errors_cached(index, error_type) -> Dict[str, Dict[str, np.ndarray]]: return reconstructions[index].map.compute_reprojection_errors( tracks_manager, error_type, ) return _compute_errors_cached def _get_valid_observations( reconstructions: List[types.Reconstruction], tracks_manager: pymap.TracksManager ) -> Any: @lru_cache(10) def _get_valid_observations_cached( index, ) -> Dict[str, Dict[str, pymap.Observation]]: return reconstructions[index].map.get_valid_observations(tracks_manager) return _get_valid_observations_cached THist = Tuple[np.ndarray, np.ndarray] def _projection_error( tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction] ) -> Tuple[float, float, float, THist, THist, THist]: all_errors_normalized, all_errors_pixels, all_errors_angular = [], [], [] average_error_normalized, average_error_pixels, average_error_angular = 0, 0, 0 for i in range(len(reconstructions)): errors_normalized = _compute_errors(reconstructions, tracks_manager)( i, pymap.ErrorType.Normalized ) errors_unnormalized = _compute_errors(reconstructions, tracks_manager)( i, pymap.ErrorType.Pixel ) errors_angular = _compute_errors(reconstructions, tracks_manager)( i, pymap.ErrorType.Angular ) for shot_id, shot_errors_normalized in errors_normalized.items(): shot = reconstructions[i].get_shot(shot_id) normalizer = max(shot.camera.width, shot.camera.height) for error_normalized, error_unnormalized, error_angular in zip( shot_errors_normalized.values(), errors_unnormalized[shot_id].values(), errors_angular[shot_id].values(), ): norm_pixels = _norm2d(error_unnormalized * normalizer) norm_normalized = _norm2d(error_normalized) norm_angle = error_angular[0] if norm_pixels > RESIDUAL_PIXEL_CUTOFF or math.isnan(norm_angle): continue average_error_normalized += norm_normalized average_error_pixels += norm_pixels average_error_angular += norm_angle all_errors_normalized.append(norm_normalized) all_errors_pixels.append(norm_pixels) all_errors_angular.append(norm_angle) error_count = len(all_errors_normalized) if error_count == 0: dummy = (np.array([]), np.array([])) return (-1.0, -1.0, -1.0, dummy, dummy, dummy) bins = 30 return ( average_error_normalized / error_count, average_error_pixels / error_count, average_error_angular / error_count, np.histogram(all_errors_normalized, bins), np.histogram(all_errors_pixels, bins), np.histogram(all_errors_angular, bins), ) def reconstruction_statistics( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], ) -> Dict[str, Any]: stats = {} stats["components"] = len(reconstructions) gps_count = 0 for rec in reconstructions: for shot in rec.shots.values(): gps_count += shot.metadata.gps_position.has_value stats["has_gps"] = gps_count > 2 stats["has_gcp"] = True if data.load_ground_control_points() else False stats["initial_points_count"] = tracks_manager.num_tracks() stats["initial_shots_count"] = len(data.images()) stats["reconstructed_points_count"] = 0 stats["reconstructed_shots_count"] = 0 stats["observations_count"] = 0 hist_agg = defaultdict(int) for rec in reconstructions: if len(rec.points) > 0: stats["reconstructed_points_count"] += len(rec.points) stats["reconstructed_shots_count"] += len(rec.shots) # get tracks length distrbution for current reconstruction hist, values = _length_histogram(tracks_manager, rec.points) # update aggregrated histogram for length, count_tracks in zip(hist, values): hist_agg[length] += count_tracks # observations total and average tracks lengths hist_agg = sorted(hist_agg.items(), key=lambda x: x[0]) lengths, counts = np.array([int(x[0]) for x in hist_agg]), np.array( [x[1] for x in hist_agg] ) points_count = stats["reconstructed_points_count"] points_count_over_two = sum(counts[1:]) stats["observations_count"] = int(sum(lengths * counts)) stats["average_track_length"] = ( (stats["observations_count"] / points_count) if points_count > 0 else -1 ) stats["average_track_length_over_two"] = ( (int(sum(lengths[1:] * counts[1:])) / points_count_over_two) if points_count_over_two > 0 else -1 ) stats["histogram_track_length"] = {k: v for k, v in hist_agg} ( avg_normalized, avg_pixels, avg_angular, (hist_normalized, bins_normalized), (hist_pixels, bins_pixels), (hist_angular, bins_angular), ) = _projection_error(tracks_manager, reconstructions) stats["reprojection_error_normalized"] = avg_normalized stats["reprojection_error_pixels"] = avg_pixels stats["reprojection_error_angular"] = avg_angular stats["reprojection_histogram_normalized"] = ( list(map(float, hist_normalized)), list(map(float, bins_normalized)), ) stats["reprojection_histogram_pixels"] = ( list(map(float, hist_pixels)), list(map(float, bins_pixels)), ) stats["reprojection_histogram_angular"] = ( list(map(float, hist_angular)), list(map(float, bins_angular)), ) return stats def processing_statistics( data: DataSet, reconstructions: List[types.Reconstruction] ) -> Dict[str, Any]: steps = { "Feature Extraction": "features.json", "Features Matching": "matches.json", "Tracks Merging": "tracks.json", "Reconstruction": "reconstruction.json", } steps_times = {} for step_name, report_file in steps.items(): file_path = os.path.join(data.data_path, "reports", report_file) if os.path.exists(file_path): with io.open_rt(file_path) as fin: obj = io.json_load(fin) else: obj = {} if "wall_time" in obj: steps_times[step_name] = obj["wall_time"] elif "wall_times" in obj: steps_times[step_name] = sum(obj["wall_times"].values()) else: steps_times[step_name] = -1 stats = {} stats["steps_times"] = steps_times stats["steps_times"]["Total Time"] = sum( filter(lambda x: x >= 0, steps_times.values()) ) try: stats["date"] = datetime.datetime.fromtimestamp( data.io_handler.timestamp(data._reconstruction_file(None)) ).strftime("%d/%m/%Y at %H:%M:%S") except FileNotFoundError: stats["date"] = "unknown" default_max = 1e30 min_x, min_y, max_x, max_y = default_max, default_max, 0, 0 for rec in reconstructions: for shot in rec.shots.values(): o = shot.pose.get_origin() min_x = min(min_x, o[0]) min_y = min(min_y, o[1]) max_x = max(max_x, o[0]) max_y = max(max_y, o[1]) stats["area"] = (max_x - min_x) * (max_y - min_y) if min_x != default_max else -1 return stats def features_statistics( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], ) -> Dict[str, Any]: stats = {} detected = [] images = {s for r in reconstructions for s in r.shots} for im in images: features_data = feature_loader.instance.load_all_data(data, im, False, False) if not features_data: continue detected.append(len(features_data.points)) if len(detected) > 0: stats["detected_features"] = { "min": min(detected), "max": max(detected), "mean": int(np.mean(detected)), "median": int(np.median(detected)), } else: stats["detected_features"] = {"min": -1, "max": -1, "mean": -1, "median": -1} per_shots = defaultdict(int) for rec in reconstructions: all_points_keys = set(rec.points.keys()) for shot_id in rec.shots: if shot_id not in tracks_manager.get_shot_ids(): continue for point_id in tracks_manager.get_shot_observations(shot_id): if point_id not in all_points_keys: continue per_shots[shot_id] += 1 per_shots = list(per_shots.values()) stats["reconstructed_features"] = { "min": int(min(per_shots)) if len(per_shots) > 0 else -1, "max": int(max(per_shots)) if len(per_shots) > 0 else -1, "mean": int(np.mean(per_shots)) if len(per_shots) > 0 else -1, "median": int(np.median(per_shots)) if len(per_shots) > 0 else -1, } return stats def _cameras_statistics(camera_model: pygeometry.Camera) -> Dict[str, Any]: camera_stats = {} for param_type, param_value in camera_model.get_parameters_map().items(): camera_stats[str(param_type).split(".")[1]] = param_value return camera_stats def cameras_statistics( data: DataSetBase, reconstructions: List[types.Reconstruction] ) -> Dict[str, Any]: stats = {} permutation = np.argsort([-len(r.shots) for r in reconstructions]) for camera_id, camera_model in data.load_camera_models().items(): stats[camera_id] = {"initial_values": _cameras_statistics(camera_model)} for idx in permutation: rec = reconstructions[idx] for camera in rec.cameras.values(): if "optimized_values" in stats[camera.id]: continue stats[camera.id]["optimized_values"] = _cameras_statistics(camera) stats[camera.id]["bias"] = io.bias_to_json(rec.biases[camera.id]) for camera_id in data.load_camera_models(): if "optimized_values" not in stats[camera_id]: del stats[camera_id] return stats def rig_statistics( data: DataSetBase, reconstructions: List[types.Reconstruction] ) -> Dict[str, Any]: stats = {} permutation = np.argsort([-len(r.shots) for r in reconstructions]) rig_cameras = data.load_rig_cameras() cameras = data.load_camera_models() for rig_camera_id, rig_camera in rig_cameras.items(): # we skip per-camera rig camera for now if rig_camera_id in cameras: continue stats[rig_camera_id] = { "initial_values": { "rotation": list(rig_camera.pose.rotation), "translation": list(rig_camera.pose.translation), } } for idx in permutation: rec = reconstructions[idx] for rig_camera in rec.rig_cameras.values(): if rig_camera.id not in stats: continue if "optimized_values" in stats[rig_camera.id]: continue stats[rig_camera.id]["optimized_values"] = { "rotation": list(rig_camera.pose.rotation), "translation": list(rig_camera.pose.translation), } for rig_camera_id in rig_cameras: if rig_camera.id not in stats: continue if "optimized_values" not in stats[rig_camera_id]: del stats[rig_camera_id] return stats def compute_all_statistics( data: DataSet, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], ) -> Dict[str, Any]: stats = {} stats["processing_statistics"] = processing_statistics(data, reconstructions) stats["features_statistics"] = features_statistics( data, tracks_manager, reconstructions ) stats["reconstruction_statistics"] = reconstruction_statistics( data, tracks_manager, reconstructions ) stats["camera_errors"] = cameras_statistics(data, reconstructions) stats["rig_errors"] = rig_statistics(data, reconstructions) stats["gps_errors"] = gps_errors(reconstructions) stats["gcp_errors"] = gcp_errors(data, reconstructions) return stats def _grid_buckets(camera: pygeometry.Camera) -> Tuple[int, int]: buckets = 40 if camera.projection_type == "spherical": return 2 * buckets, buckets else: return buckets, buckets def _heatmap_buckets(camera: pygeometry.Camera) -> Tuple[int, int]: buckets = 500 if camera.projection_type == "spherical": return 2 * buckets, buckets else: return buckets, int(buckets / camera.width * camera.height) def _get_gaussian_kernel(radius: int, ratio: float) -> np.ndarray: std_dev = radius / ratio half_kernel = list(range(1, radius + 1)) kernel = np.array(half_kernel + [radius + 1] + list(reversed(half_kernel))) kernel = np.exp(np.outer(kernel.T, kernel) / (2 * std_dev * std_dev)) return kernel / sum(kernel.flatten()) def save_matchgraph( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], output_path: str, io_handler: io.IoFilesystemBase, ) -> None: all_shots = [] all_points = [] shot_component = {} for i, rec in enumerate(reconstructions): all_points += rec.points all_shots += rec.shots for shot in rec.shots: shot_component[shot] = i connectivity = tracks_manager.get_all_pairs_connectivity(all_shots, all_points) all_values = connectivity.values() lowest = np.percentile(list(all_values), 5) highest = np.percentile(list(all_values), 95) plt.clf() cmap = cm.get_cmap("viridis") for (node1, node2), edge in sorted(connectivity.items(), key=lambda x: x[1]): if edge < 2 * data.config["resection_min_inliers"]: continue comp1 = shot_component[node1] comp2 = shot_component[node2] if comp1 != comp2: continue o1 = reconstructions[comp1].shots[node1].pose.get_origin() o2 = reconstructions[comp2].shots[node2].pose.get_origin() c = max(0, min(1.0, 1 - (edge - lowest) / (highest - lowest))) plt.plot([o1[0], o2[0]], [o1[1], o2[1]], linestyle="-", color=cmap(c)) for i, rec in enumerate(reconstructions): for shot in rec.shots.values(): o = shot.pose.get_origin() c = i / len(reconstructions) plt.plot(o[0], o[1], linestyle="", marker="o", color=cmap(c)) plt.xticks([]) plt.yticks([]) ax = plt.gca() for b in ["top", "bottom", "left", "right"]: ax.spines[b].set_visible(False) norm = colors.Normalize(vmin=lowest, vmax=highest) sm = cm.ScalarMappable(norm=norm, cmap=cmap.reversed()) sm.set_array([]) plt.colorbar( sm, orientation="horizontal", label="Number of matches between images", pad=0.0, ) with io_handler.open(os.path.join(output_path, "matchgraph.png"), "wb") as fwb: plt.savefig( fwb, dpi=300, bbox_inches="tight", ) def save_residual_histogram( stats: Dict[str, Any], output_path: str, io_handler: io.IoFilesystemBase, ) -> None: backup = dict(mpl.rcParams) fig, axs = plt.subplots(1, 3, tight_layout=True, figsize=(15, 3)) h_norm, b_norm = stats["reconstruction_statistics"][ "reprojection_histogram_normalized" ] n, _, p_norm = axs[0].hist(b_norm[:-1], b_norm, weights=h_norm) n = n.astype("int") for i in range(len(p_norm)): p_norm[i].set_facecolor(plt.cm.viridis(n[i] / max(n))) h_pixel, b_pixel = stats["reconstruction_statistics"][ "reprojection_histogram_pixels" ] n, _, p_pixel = axs[1].hist(b_pixel[:-1], b_pixel, weights=h_pixel) n = n.astype("int") for i in range(len(p_pixel)): p_pixel[i].set_facecolor(plt.cm.viridis(n[i] / max(n))) h_angular, b_angular = stats["reconstruction_statistics"][ "reprojection_histogram_angular" ] n, _, p_angular, = axs[ 2 ].hist(b_angular[:-1], b_angular, weights=h_angular) n = n.astype("int") for i in range(len(p_angular)): p_angular[i].set_facecolor(plt.cm.viridis(n[i] / max(n))) axs[0].set_title("Normalized Residual") axs[1].set_title("Pixel Residual") axs[2].set_title("Angular Residual") with io_handler.open( os.path.join(output_path, "residual_histogram.png"), "wb" ) as fwb: plt.savefig( fwb, dpi=300, bbox_inches="tight", ) mpl.rcParams = backup def save_topview( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], output_path: str, io_handler: io.IoFilesystemBase, ) -> None: points = [] colors = [] for rec in reconstructions: for point in rec.points.values(): track = tracks_manager.get_track_observations(point.id) if len(track) < 2: continue coords = point.coordinates points.append(coords) r, g, b = [], [], [] for obs in track.values(): r.append(obs.color[0]) g.append(obs.color[1]) b.append(obs.color[2]) colors.append( (statistics.median(r), statistics.median(g), statistics.median(b)) ) all_x = [] all_y = [] for rec in reconstructions: for shot in rec.shots.values(): o = shot.pose.get_origin() all_x.append(o[0]) all_y.append(o[1]) if not shot.metadata.gps_position.has_value: continue gps = shot.metadata.gps_position.value all_x.append(gps[0]) all_y.append(gps[1]) # compute camera's XY bounding box low_x, high_x = np.min(all_x), np.max(all_x) low_y, high_y = np.min(all_y), np.max(all_y) # get its size size_x = high_x - low_x size_y = high_y - low_y # expand bounding box by some margin margin = 0.05 low_x -= size_x * margin high_x += size_y * margin low_y -= size_x * margin high_y += size_y * margin # update size size_x = high_x - low_x size_y = high_y - low_y im_size_x = 2000 im_size_y = int(im_size_x * size_y / size_x) topview = np.zeros((im_size_y, im_size_x, 3)) # splat points using gaussian + max-pool splatting = 15 size = 2 * splatting + 1 kernel = _get_gaussian_kernel(splatting, 2) kernel /= kernel[splatting, splatting] for point, color in zip(points, colors): x, y = int((point[0] - low_x) / size_x * im_size_x), int( (point[1] - low_y) / size_y * im_size_y ) if not ((0 < x < (im_size_x - 1)) and (0 < y < (im_size_y - 1))): continue k_low_x, k_low_y = -min(x - splatting, 0), -min(y - splatting, 0) k_high_x, k_high_y = ( size - max(x + splatting - (im_size_x - 2), 0), size - max(y + splatting - (im_size_y - 2), 0), ) h_low_x, h_low_y = max(x - splatting, 0), max(y - splatting, 0) h_high_x, h_high_y = min(x + splatting + 1, im_size_x - 1), min( y + splatting + 1, im_size_y - 1 ) for i in range(3): current = topview[h_low_y:h_high_y, h_low_x:h_high_x, i] splat = kernel[k_low_y:k_high_y, k_low_x:k_high_x] topview[h_low_y:h_high_y, h_low_x:h_high_x, i] = np.maximum( splat * (color[i] / 255.0), current ) plt.clf() plt.imshow(topview) # display computed camera's XY linewidth = 1 markersize = 4 for rec in reconstructions: sorted_shots = sorted( rec.shots.values(), key=lambda x: x.metadata.capture_time.value ) c_camera = cm.get_cmap("cool")(0 / len(reconstructions)) c_gps = cm.get_cmap("autumn")(0 / len(reconstructions)) for j, shot in enumerate(sorted_shots): o = shot.pose.get_origin() x, y = int((o[0] - low_x) / size_x * im_size_x), int( (o[1] - low_y) / size_y * im_size_y ) plt.plot( x, y, linestyle="", marker="o", color=c_camera, markersize=markersize, linewidth=1, ) # also display camera path using capture time if j < len(sorted_shots) - 1: n = sorted_shots[j + 1].pose.get_origin() nx, ny = int((n[0] - low_x) / size_x * im_size_x), int( (n[1] - low_y) / size_y * im_size_y ) plt.plot( [x, nx], [y, ny], linestyle="-", color=c_camera, linewidth=linewidth ) # display GPS error if not shot.metadata.gps_position.has_value: continue gps = shot.metadata.gps_position.value gps_x, gps_y = int((gps[0] - low_x) / size_x * im_size_x), int( (gps[1] - low_y) / size_y * im_size_y ) plt.plot( gps_x, gps_y, linestyle="", marker="v", color=c_gps, markersize=markersize, linewidth=1, ) plt.plot( [x, gps_x], [y, gps_y], linestyle="-", color=c_gps, linewidth=linewidth ) plt.xticks( [0, im_size_x / 2, im_size_x], [0, f"{int(size_x / 2):.0f}", f"{size_x:.0f} meters"], fontsize="small", ) plt.yticks( [im_size_y, im_size_y / 2, 0], [0, f"{int(size_y / 2):.0f}", f"{size_y:.0f} meters"], fontsize="small", ) with io_handler.open(os.path.join(output_path, "topview.png"), "wb") as fwb: plt.savefig( fwb, dpi=300, bbox_inches="tight", ) def save_heatmap( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], output_path: str, io_handler: io.IoFilesystemBase, ) -> None: all_projections = {} splatting = 15 size = 2 * splatting + 1 kernel = _get_gaussian_kernel(splatting, 2) all_cameras = {} for rec in reconstructions: for camera in rec.cameras.values(): all_projections[camera.id] = [] all_cameras[camera.id] = camera for i in range(len(reconstructions)): valid_observations = _get_valid_observations(reconstructions, tracks_manager)(i) for shot_id, observations in valid_observations.items(): shot = reconstructions[i].get_shot(shot_id) w = shot.camera.width h = shot.camera.height center = np.array([w / 2.0, h / 2.0]) normalizer = max(shot.camera.width, shot.camera.height) buckets_x, buckets_y = _heatmap_buckets(shot.camera) w_bucket = buckets_x / w h_bucket = buckets_y / h shots_projections = [] for observation in observations.values(): bucket = observation.point * normalizer + center x = max([0, min([int(bucket[0] * w_bucket), buckets_x - 1])]) y = max([0, min([int(bucket[1] * h_bucket), buckets_y - 1])]) shots_projections.append((x, y)) all_projections[shot.camera.id] += shots_projections for camera_id, projections in all_projections.items(): buckets_x, buckets_y = _heatmap_buckets(rec.cameras[camera_id]) camera_heatmap = np.zeros((buckets_y, buckets_x)) for x, y in projections: k_low_x, k_low_y = -min(x - splatting, 0), -min(y - splatting, 0) k_high_x, k_high_y = ( size - max(x + splatting - (buckets_x - 2), 0), size - max(y + splatting - (buckets_y - 2), 0), ) h_low_x, h_low_y = max(x - splatting, 0), max(y - splatting, 0) h_high_x, h_high_y = min(x + splatting + 1, buckets_x - 1), min( y + splatting + 1, buckets_y - 1 ) camera_heatmap[h_low_y:h_high_y, h_low_x:h_high_x] += kernel[ k_low_y:k_high_y, k_low_x:k_high_x ] highest = np.max(camera_heatmap) lowest = np.min(camera_heatmap) plt.clf() plt.imshow((camera_heatmap - lowest) / (highest - lowest) * 255) plt.title( f"Detected features heatmap for camera {camera_id}", fontsize="x-small", ) camera = all_cameras[camera_id] w = camera.width h = camera.height plt.xticks( [0, buckets_x / 2, buckets_x], [0, int(w / 2), w], fontsize="x-small", ) plt.yticks( [buckets_y, buckets_y / 2, 0], [0, int(h / 2), h], fontsize="x-small", ) with io_handler.open( os.path.join( output_path, "heatmap_" + str(camera_id.replace("/", "_")) + ".png" ), "wb", ) as fwb: plt.savefig( fwb, dpi=300, bbox_inches="tight", ) def save_residual_grids( data: DataSetBase, tracks_manager: pymap.TracksManager, reconstructions: List[types.Reconstruction], output_path: str, io_handler: io.IoFilesystemBase, ) -> None: all_errors = {} scaling = 4 for rec in reconstructions: for camera_id in rec.cameras: all_errors[camera_id] = [] for i in range(len(reconstructions)): valid_observations = _get_valid_observations(reconstructions, tracks_manager)(i) errors_scaled = _compute_errors(reconstructions, tracks_manager)( i, pymap.ErrorType.Normalized ) errors_unscaled = _compute_errors(reconstructions, tracks_manager)( i, pymap.ErrorType.Pixel ) for shot_id, shot_errors in errors_scaled.items(): shot = reconstructions[i].get_shot(shot_id) w = shot.camera.width h = shot.camera.height center = np.array([w / 2.0, h / 2.0]) normalizer = max(shot.camera.width, shot.camera.height) buckets_x, buckets_y = _grid_buckets(shot.camera) w_bucket = buckets_x / w h_bucket = buckets_y / h shots_errors = [] for error_scaled, error_unscaled, observation in zip( shot_errors.values(), errors_unscaled[shot_id].values(), valid_observations[shot_id].values(), ): if _norm2d(error_unscaled * normalizer) > RESIDUAL_PIXEL_CUTOFF: continue bucket = observation.point * normalizer + center x = max([0, min([int(bucket[0] * w_bucket), buckets_x - 1])]) y = max([0, min([int(bucket[1] * h_bucket), buckets_y - 1])]) shots_errors.append((x, y, error_scaled)) all_errors[shot.camera.id] += shots_errors for camera_id, errors in all_errors.items(): if not errors: continue buckets_x, buckets_y = _grid_buckets(rec.cameras[camera_id]) camera_array_res = np.zeros((buckets_y, buckets_x, 2)) camera_array_count = np.full((buckets_y, buckets_x, 1), 1) for x, y, e in errors: camera_array_res[y, x] += e camera_array_count[y, x, 0] += 1 camera_array_res = np.divide(camera_array_res, camera_array_count) camera = rec.get_camera(camera_id) w, h = camera.width, camera.height normalizer = max(w, h) clamp = 0.1 res_colors = np.linalg.norm(camera_array_res[:, :, :2], axis=2) lowest = np.percentile(res_colors, 0) highest = np.percentile(res_colors, 100 * (1 - clamp)) np.clip(res_colors, lowest, highest, res_colors) res_colors /= highest - lowest plt.clf() plt.figure(figsize=(12, 10)) Q = plt.quiver( camera_array_res[:, :, 0] * scaling, camera_array_res[:, :, 1] * scaling, res_colors, units="xy", angles="xy", scale_units="xy", scale=1, width=0.1, cmap="viridis_r", ) scale = highest - lowest plt.quiverkey( Q, X=0.1, Y=1.04, U=scale * scaling, label=f"Residual grid scale : {scale:.2f}", labelpos="E", ) plt.title( " ", fontsize="large", ) norm = colors.Normalize(vmin=lowest, vmax=highest) cmap = cm.get_cmap("viridis_r") sm = cm.ScalarMappable(norm=norm, cmap=cmap) sm.set_array([]) plt.colorbar( mappable=sm, orientation="horizontal", label="Residual Norm", pad=0.08, aspect=40, ) plt.xticks( [0, buckets_x / 2, buckets_x], [0, int(w / 2), w], fontsize="x-small" ) plt.yticks( [0, buckets_y / 2, buckets_y], [0, int(h / 2), h], fontsize="x-small" ) with io_handler.open( os.path.join( output_path, "residuals_" + str(camera_id.replace("/", "_")) + ".png" ), "wb", ) as fwb: plt.savefig( fwb, dpi=300, bbox_inches="tight", ) def decimate_points( reconstructions: List[types.Reconstruction], max_num_points: int ) -> None: """ Destructively decimate the points in a reconstruction if they exceed max_num_points by removing points at random """ for rec in reconstructions: if len(rec.points) > max_num_points: all_points = rec.points random_ids = list(all_points.keys()) random.shuffle(random_ids) random_ids = set(random_ids[: len(all_points) - max_num_points]) for point_id in random_ids: rec.remove_point(point_id)