opensfm/dataset.py (708 lines of code) (raw):

import gzip import json import logging import os import pickle from io import BytesIO from typing import Dict, List, Tuple, Optional, IO, Any import numpy as np from opensfm import ( config, features, geo, io, pygeometry, types, pymap, masking, rig, ) from opensfm.dataset_base import DataSetBase from PIL.PngImagePlugin import PngImageFile logger = logging.getLogger(__name__) class DataSet(DataSetBase): """Accessors to the main input and output data. Data include input images, masks, and segmentation as well temporary data such as features and matches and the final reconstructions. All data is stored inside a single folder with a specific subfolder structure. It is possible to store data remotely or in different formats by subclassing this class and overloading its methods. """ io_handler: io.IoFilesystemBase = io.IoFilesystemDefault() config = None image_files: Dict[str, str] = {} mask_files: Dict[str, str] = {} image_list: List[str] = [] def __init__(self, data_path: str, io_handler=io.IoFilesystemDefault) -> None: """Init dataset associated to a folder.""" self.io_handler = io_handler self.data_path = data_path self.load_config() self.load_image_list() self.load_mask_list() def _config_file(self) -> str: return os.path.join(self.data_path, "config.yaml") def load_config(self) -> None: config_file_path = self._config_file() if self.io_handler.isfile(config_file_path): with self.io_handler.open(config_file_path) as f: self.config = config.load_config_from_fileobject(f) else: self.config = config.default_config() def _image_list_file(self) -> str: return os.path.join(self.data_path, "image_list.txt") def load_image_list(self) -> None: """Load image list from image_list.txt or list images/ folder.""" image_list_file = self._image_list_file() image_list_path = os.path.join(self.data_path, "images") if self.io_handler.isfile(image_list_file): with self.io_handler.open_rt(image_list_file) as fin: lines = fin.read().splitlines() self._set_image_list(lines) else: self._set_image_path(image_list_path) if self.data_path and not self.image_list: raise IOError( "No Images found in {}" .format(image_list_path) ) def images(self) -> List[str]: """List of file names of all images in the dataset.""" return self.image_list def _image_file(self, image: str) -> str: """Path to the image file.""" return self.image_files[image] def open_image_file(self, image: str) -> IO[Any]: """Open image file and return file object.""" return self.io_handler.open(self._image_file(image), "rb") def load_image( self, image: str, unchanged: bool = False, anydepth: bool = False, grayscale: bool = False, ) -> np.ndarray: """Load image pixels as numpy array. The array is 3D, indexed by y-coord, x-coord, channel. The channels are in RGB order. """ return self.io_handler.imread( self._image_file(image), unchanged=unchanged, anydepth=anydepth, grayscale=grayscale, ) def image_size(self, image: str) -> Tuple[int, int]: """Height and width of the image.""" return self.io_handler.image_size(self._image_file(image)) def load_mask_list(self) -> None: """Load mask list from mask_list.txt or list masks/ folder.""" mask_list_file = os.path.join(self.data_path, "mask_list.txt") if self.io_handler.isfile(mask_list_file): with self.io_handler.open_rt(mask_list_file) as fin: lines = fin.read().splitlines() self._set_mask_list(lines) else: self._set_mask_path(os.path.join(self.data_path, "masks")) def load_mask(self, image: str) -> Optional[np.ndarray]: """Load image mask if it exists, otherwise return None.""" if image in self.mask_files: mask_path = self.mask_files[image] mask = self.io_handler.imread(mask_path, grayscale=True) if mask is None: raise IOError( "Unable to load mask for image {} " "from file {}".format(image, mask_path) ) else: mask = None return mask def _instances_path(self) -> str: return os.path.join(self.data_path, "instances") def _instances_file(self, image: str) -> str: return os.path.join(self._instances_path(), image + ".png") def load_instances(self, image: str) -> Optional[np.ndarray]: """Load image instances file if it exists, otherwise return None.""" instances_file = self._instances_file(image) if self.io_handler.isfile(instances_file): instances = self.io_handler.imread(instances_file, grayscale=True) else: instances = None return instances def _segmentation_path(self) -> str: return os.path.join(self.data_path, "segmentations") def _segmentation_file(self, image: str) -> str: return os.path.join(self._segmentation_path(), image + ".png") def segmentation_labels(self) -> List[Any]: return [] def load_segmentation(self, image: str) -> Optional[np.ndarray]: """Load image segmentation if it exists, otherwise return None.""" segmentation_file = self._segmentation_file(image) if self.io_handler.isfile(segmentation_file): with self.io_handler.open(segmentation_file, "rb") as fp: with PngImageFile(fp) as png_image: # TODO: We do not write a header tag in the metadata. Might be good safety check. data = np.array(png_image) if data.ndim == 2: return data elif data.ndim == 3: return data[:, :, 0] # TODO we can optionally return also the instances and scores: # instances = ( # data[:, :, 1].astype(np.int16) + data[:, :, 2].astype(np.int16) * 256 # ) # scores = data[:, :, 3].astype(np.float32) / 256.0 else: raise IndexError else: segmentation = None return segmentation def segmentation_ignore_values(self, image: str) -> List[int]: """List of label values to ignore. Pixels with these label values will be masked out and won't be processed when extracting and matching features. """ return self.config.get("segmentation_ignore_values", []) def undistorted_segmentation_ignore_values(self, image: str) -> List[int]: """List of label values to ignore on undistorted images Pixels with these label values will be masked out and won't be processed when computing depthmaps. """ return self.config.get( "undistorted_segmentation_ignore_values", self.segmentation_ignore_values(image), ) def _is_image_file(self, filename: str) -> bool: extensions = {"jpg", "jpeg", "png", "tif", "tiff", "pgm", "pnm", "gif"} return filename.split(".")[-1].lower() in extensions def _set_image_path(self, path: str) -> None: """Set image path and find all images in there""" self.image_list = [] self.image_files = {} if self.io_handler.exists(path): for name in self.io_handler.ls(path): if self._is_image_file(name): self.image_list.append(name) self.image_files[name] = os.path.join(path, name) def _set_image_list(self, image_list: List[str]) -> None: self.image_list = [] self.image_files = {} for line in image_list: path = os.path.join(self.data_path, line) name = os.path.basename(path) self.image_list.append(name) self.image_files[name] = path def _set_mask_path(self, path: str) -> None: """Set mask path and find all masks in there""" self.mask_files = {} if self.io_handler.isdir(path): files = set(self.io_handler.ls(path)) for image in self.images(): mask = image + ".png" if mask in files: self.mask_files[image] = os.path.join(path, mask) def _set_mask_list(self, mask_list_lines: List[str]) -> None: self.mask_files = {} for line in mask_list_lines: image, relpath = line.split(None, 1) path = os.path.join(self.data_path, relpath.strip()) self.mask_files[image.strip()] = path def _exif_path(self) -> str: """Return path of extracted exif directory""" return os.path.join(self.data_path, "exif") def _exif_file(self, image: str) -> str: """ Return path of exif information for given image :param image: Image name, with extension (i.e. 123.jpg) """ return os.path.join(self._exif_path(), image + ".exif") def load_exif(self, image: str) -> Dict[str, Any]: """Load pre-extracted image exif metadata.""" with self.io_handler.open_rt(self._exif_file(image)) as fin: return json.load(fin) def save_exif(self, image: str, data: Dict[str, Any]) -> None: self.io_handler.mkdir_p(self._exif_path()) with self.io_handler.open_wt(self._exif_file(image)) as fout: io.json_dump(data, fout) def exif_exists(self, image: str) -> bool: return self.io_handler.isfile(self._exif_file(image)) def feature_type(self) -> str: """Return the type of local features (e.g. AKAZE, SURF, SIFT)""" feature_name = self.config["feature_type"].lower() if self.config["feature_root"]: feature_name = "root_" + feature_name return feature_name def _feature_path(self) -> str: """Return path of feature descriptors and FLANN indices directory""" return os.path.join(self.data_path, "features") def _feature_file(self, image: str) -> str: """ Return path of feature file for specified image :param image: Image name, with extension (i.e. 123.jpg) """ return os.path.join(self._feature_path(), image + ".features.npz") def _feature_file_legacy(self, image: str) -> str: """ Return path of a legacy feature file for specified image :param image: Image name, with extension (i.e. 123.jpg) """ return os.path.join(self._feature_path(), image + ".npz") def _save_features( self, filepath: str, features_data: features.FeaturesData ) -> None: self.io_handler.mkdir_p(self._feature_path()) with self.io_handler.open(filepath, "wb") as fwb: features_data.save(fwb, self.config) def features_exist(self, image: str) -> bool: return self.io_handler.isfile( self._feature_file(image) ) or self.io_handler.isfile(self._feature_file_legacy(image)) def load_features(self, image: str) -> Optional[features.FeaturesData]: features_filepath = ( self._feature_file_legacy(image) if self.io_handler.isfile(self._feature_file_legacy(image)) else self._feature_file(image) ) with self.io_handler.open(features_filepath, "rb") as f: return features.FeaturesData.from_file(f, self.config) def save_features(self, image: str, features_data: features.FeaturesData) -> None: self._save_features(self._feature_file(image), features_data) def _words_file(self, image: str) -> str: return os.path.join(self._feature_path(), image + ".words.npz") def words_exist(self, image: str) -> bool: return self.io_handler.isfile(self._words_file(image)) def load_words(self, image: str) -> np.ndarray: with self.io_handler.open(self._words_file(image), "rb") as f: s = np.load(f) return s["words"].astype(np.int32) def save_words(self, image: str, words: np.ndarray) -> None: with self.io_handler.open(self._words_file(image), "wb") as f: np.savez_compressed(f, words=words.astype(np.uint16)) def _matches_path(self) -> str: """Return path of matches directory""" return os.path.join(self.data_path, "matches") def _matches_file(self, image: str) -> str: """File for matches for an image""" return os.path.join(self._matches_path(), "{}_matches.pkl.gz".format(image)) def matches_exists(self, image: str) -> bool: return self.io_handler.isfile(self._matches_file(image)) def load_matches(self, image: str) -> Dict[str, np.ndarray]: with self.io_handler.open(self._matches_file(image), "rb") as fin: matches = pickle.load(BytesIO(gzip.decompress(fin.read()))) return matches def save_matches(self, image: str, matches: Dict[str, np.ndarray]) -> None: self.io_handler.mkdir_p(self._matches_path()) with BytesIO() as buffer: with gzip.GzipFile(fileobj=buffer, mode="w") as fzip: pickle.dump(matches, fzip) with self.io_handler.open(self._matches_file(image), "wb") as fw: fw.write(buffer.getvalue()) def find_matches(self, im1: str, im2: str) -> np.ndarray: if self.matches_exists(im1): im1_matches = self.load_matches(im1) if im2 in im1_matches: return im1_matches[im2] if self.matches_exists(im2): im2_matches = self.load_matches(im2) if im1 in im2_matches: if len(im2_matches[im1]): return im2_matches[im1][:, [1, 0]] return np.array([]) def _tracks_manager_file(self, filename: Optional[str] = None) -> str: """Return path of tracks file""" return os.path.join(self.data_path, filename or "tracks.csv") def load_tracks_manager( self, filename: Optional[str] = None ) -> pymap.TracksManager: """Return the tracks manager""" with self.io_handler.open(self._tracks_manager_file(filename), "r") as f: return pymap.TracksManager.instanciate_from_string(f.read()) def tracks_exists(self, filename: Optional[str] = None) -> bool: return self.io_handler.isfile(self._tracks_manager_file(filename)) def save_tracks_manager( self, tracks_manager: pymap.TracksManager, filename: Optional[str] = None ) -> None: with self.io_handler.open(self._tracks_manager_file(filename), "w") as fw: fw.write(tracks_manager.as_string()) def _reconstruction_file(self, filename: Optional[str]) -> str: """Return path of reconstruction file""" return os.path.join(self.data_path, filename or "reconstruction.json") def reconstruction_exists(self, filename: Optional[str] = None) -> bool: return self.io_handler.isfile(self._reconstruction_file(filename)) def load_reconstruction( self, filename: Optional[str] = None ) -> List[types.Reconstruction]: with self.io_handler.open_rt(self._reconstruction_file(filename)) as fin: reconstructions = io.reconstructions_from_json(io.json_load(fin)) return reconstructions def save_reconstruction( self, reconstruction: List[types.Reconstruction], filename: Optional[str] = None, minify=False, ) -> None: with self.io_handler.open_wt(self._reconstruction_file(filename)) as fout: io.json_dump(io.reconstructions_to_json(reconstruction), fout, minify) def _reference_lla_path(self) -> str: return os.path.join(self.data_path, "reference_lla.json") def init_reference(self, images: Optional[List[str]] = None) -> None: """Initializes the dataset reference if not done already.""" if not self.reference_exists(): reference = invent_reference_from_gps_and_gcp(self, images) self.save_reference(reference) def save_reference(self, reference: geo.TopocentricConverter) -> None: reference_lla = { "latitude": reference.lat, "longitude": reference.lon, "altitude": reference.alt, } with self.io_handler.open_wt(self._reference_lla_path()) as fout: io.json_dump(reference_lla, fout) def load_reference(self) -> geo.TopocentricConverter: """Load reference as a topocentric converter.""" with self.io_handler.open_rt(self._reference_lla_path()) as fin: lla = io.json_load(fin) return geo.TopocentricConverter( lla["latitude"], lla["longitude"], lla["altitude"] ) def reference_exists(self) -> bool: return self.io_handler.isfile(self._reference_lla_path()) def _camera_models_file(self) -> str: """Return path of camera model file""" return os.path.join(self.data_path, "camera_models.json") def load_camera_models(self) -> Dict[str, pygeometry.Camera]: """Return camera models data""" with self.io_handler.open_rt(self._camera_models_file()) as fin: obj = json.load(fin) return io.cameras_from_json(obj) def save_camera_models(self, camera_models: Dict[str, pygeometry.Camera]) -> None: """Save camera models data""" with self.io_handler.open_wt(self._camera_models_file()) as fout: obj = io.cameras_to_json(camera_models) io.json_dump(obj, fout) def _camera_models_overrides_file(self) -> str: """Path to the camera model overrides file.""" return os.path.join(self.data_path, "camera_models_overrides.json") def camera_models_overrides_exists(self) -> bool: """Check if camera overrides file exists.""" return self.io_handler.isfile(self._camera_models_overrides_file()) def load_camera_models_overrides(self) -> Dict[str, pygeometry.Camera]: """Load camera models overrides data.""" with self.io_handler.open_rt(self._camera_models_overrides_file()) as fin: obj = json.load(fin) return io.cameras_from_json(obj) def save_camera_models_overrides( self, camera_models: Dict[str, pygeometry.Camera] ) -> None: """Save camera models overrides data""" with self.io_handler.open_wt(self._camera_models_overrides_file()) as fout: obj = io.cameras_to_json(camera_models) io.json_dump(obj, fout) def _exif_overrides_file(self) -> str: """Path to the EXIF overrides file.""" return os.path.join(self.data_path, "exif_overrides.json") def exif_overrides_exists(self) -> bool: """Check if EXIF overrides file exists.""" return self.io_handler.isfile(self._exif_overrides_file()) def load_exif_overrides(self) -> Dict[str, Any]: """Load EXIF overrides data.""" with self.io_handler.open_rt(self._exif_overrides_file()) as fin: return json.load(fin) def save_exif_overrides(self, exif_overrides: Dict[str, Any]) -> None: """Load EXIF overrides data.""" with self.io_handler.open_wt(self._exif_overrides_file()) as fout: io.json_dump(exif_overrides, fout) def _rig_cameras_file(self) -> str: """Return path of rig models file""" return os.path.join(self.data_path, "rig_cameras.json") def load_rig_cameras(self) -> Dict[str, pymap.RigCamera]: """Return rig models data""" all_rig_cameras = rig.default_rig_cameras(self.load_camera_models()) if not self.io_handler.exists(self._rig_cameras_file()): return all_rig_cameras with self.io_handler.open_rt(self._rig_cameras_file()) as fin: rig_cameras = io.rig_cameras_from_json(json.load(fin)) for rig_camera_id, rig_camera in rig_cameras.items(): all_rig_cameras[rig_camera_id] = rig_camera return all_rig_cameras def save_rig_cameras(self, rig_cameras: Dict[str, pymap.RigCamera]) -> None: """Save rig models data""" with self.io_handler.open_wt(self._rig_cameras_file()) as fout: io.json_dump(io.rig_cameras_to_json(rig_cameras), fout) def _rig_assignments_file(self) -> str: """Return path of rig assignments file""" return os.path.join(self.data_path, "rig_assignments.json") def load_rig_assignments(self) -> Dict[str, List[Tuple[str, str]]]: """Return rig assignments data""" if not self.io_handler.exists(self._rig_assignments_file()): return {} with self.io_handler.open_rt(self._rig_assignments_file()) as fin: assignments = json.load(fin) # Backward compatibility. # Older versions of the file were stored as a list of instances without id. if isinstance(assignments, list): assignments = {str(i): v for i, v in enumerate(assignments)} return assignments def save_rig_assignments( self, rig_assignments: Dict[str, List[Tuple[str, str]]] ) -> None: """Save rig assignments data""" with self.io_handler.open_wt(self._rig_assignments_file()) as fout: io.json_dump(rig_assignments, fout) def append_to_profile_log(self, content: str) -> None: """Append content to the profile.log file.""" path = os.path.join(self.data_path, "profile.log") with self.io_handler.open(path, "a") as fp: fp.write(content) def _report_path(self) -> str: return os.path.join(self.data_path, "reports") def load_report(self, path: str) -> str: """Load a report file as a string.""" with self.io_handler.open_rt(os.path.join(self._report_path(), path)) as fin: return fin.read() def save_report(self, report_str: str, path: str) -> None: """Save report string to a file.""" filepath = os.path.join(self._report_path(), path) self.io_handler.mkdir_p(os.path.dirname(filepath)) with self.io_handler.open_wt(filepath) as fout: return fout.write(report_str) def _ply_file(self, filename: Optional[str]) -> str: return os.path.join(self.data_path, filename or "reconstruction.ply") def save_ply( self, reconstruction: types.Reconstruction, tracks_manager: pymap.TracksManager, filename: Optional[str] = None, no_cameras: bool = False, no_points: bool = False, point_num_views: bool = False, ) -> None: """Save a reconstruction in PLY format.""" ply = io.reconstruction_to_ply( reconstruction, tracks_manager, no_cameras, no_points, point_num_views ) with self.io_handler.open_wt(self._ply_file(filename)) as fout: fout.write(ply) def _ground_control_points_file(self) -> str: return os.path.join(self.data_path, "ground_control_points.json") def _gcp_list_file(self) -> str: return os.path.join(self.data_path, "gcp_list.txt") def load_ground_control_points(self) -> List[pymap.GroundControlPoint]: """Load ground control points.""" exif = {image: self.load_exif(image) for image in self.images()} gcp = [] if self.io_handler.isfile(self._gcp_list_file()): with self.io_handler.open_rt(self._gcp_list_file()) as fin: gcp = io.read_gcp_list(fin, exif) pcs = [] if self.io_handler.isfile(self._ground_control_points_file()): with self.io_handler.open_rt(self._ground_control_points_file()) as fin: pcs = io.read_ground_control_points(fin) return gcp + pcs def save_ground_control_points( self, points: List[pymap.GroundControlPoint], ) -> None: with self.io_handler.open_wt(self._ground_control_points_file()) as fout: io.write_ground_control_points(points, fout) def image_as_array(self, image: str) -> np.ndarray: logger.warning("image_as_array() is deprecated. Use load_image() instead.") return self.load_image(image) def mask_as_array(self, image: str) -> Optional[np.ndarray]: logger.warning("mask_as_array() is deprecated. Use load_mask() instead.") return self.load_mask(image) def subset(self, name: str, images_subset: List[str]) -> "DataSet": """Create a subset of this dataset by symlinking input data.""" subset_dataset_path = os.path.join(self.data_path, name) self.io_handler.mkdir_p(subset_dataset_path) folders = ["images", "segmentations", "masks"] for folder in folders: self.io_handler.mkdir_p(os.path.join(subset_dataset_path, folder)) subset_dataset = DataSet(subset_dataset_path, self.io_handler) files = [] for method in [ "_camera_models_file", "_config_file", "_camera_models_overrides_file", "_exif_overrides_file", ]: files.append( ( getattr(self, method)(), getattr(subset_dataset, method)(), ) ) for image in images_subset: files.append( ( self._image_file(image), os.path.join(subset_dataset_path, "images", image), ) ) files.append( ( self._segmentation_file(image), os.path.join(subset_dataset_path, "segmentations", image + ".png"), ) ) if image in self.mask_files: files.append( ( self.mask_files[image], os.path.join(subset_dataset_path, "masks", image + ".png"), ) ) for src, dst in files: if not self.io_handler.exists(src): continue self.io_handler.rm_if_exist(dst) self.io_handler.symlink(src, dst) return DataSet(subset_dataset_path, self.io_handler) def undistorted_dataset(self) -> "UndistortedDataSet": return UndistortedDataSet( self, os.path.join(self.data_path, "undistorted"), self.io_handler ) class UndistortedDataSet(object): """Accessors to the undistorted data of a dataset. Data include undistorted images, masks, and segmentation as well the undistorted reconstruction, tracks graph and computed depth maps. All data is stored inside the single folder ``undistorted_data_path``. By default, this path is set to the ``undistorted`` subfolder. """ base: DataSetBase config: Dict[str, Any] = {} data_path: str def __init__( self, base_dataset: DataSetBase, undistorted_data_path: str, io_handler=io.IoFilesystemDefault, ) -> None: """Init dataset associated to a folder.""" self.base = base_dataset self.config = self.base.config self.data_path = undistorted_data_path self.io_handler = io_handler def load_undistorted_shot_ids(self) -> Dict[str, List[str]]: filename = os.path.join(self.data_path, "undistorted_shot_ids.json") with self.io_handler.open_rt(filename) as fin: return io.json_load(fin) def save_undistorted_shot_ids(self, ushot_dict: Dict[str, List[str]]) -> None: filename = os.path.join(self.data_path, "undistorted_shot_ids.json") self.io_handler.mkdir_p(self.data_path) with self.io_handler.open_wt(filename) as fout: io.json_dump(ushot_dict, fout, minify=False) def _undistorted_image_path(self) -> str: return os.path.join(self.data_path, "images") def _undistorted_image_file(self, image: str) -> str: """Path of undistorted version of an image.""" return os.path.join(self._undistorted_image_path(), image) def load_undistorted_image(self, image: str) -> np.ndarray: """Load undistorted image pixels as a numpy array.""" return self.io_handler.imread(self._undistorted_image_file(image)) def save_undistorted_image(self, image: str, array: np.ndarray) -> None: """Save undistorted image pixels.""" self.io_handler.mkdir_p(self._undistorted_image_path()) self.io_handler.imwrite(self._undistorted_image_file(image), array) def undistorted_image_size(self, image: str) -> Tuple[int, int]: """Height and width of the undistorted image.""" return self.io_handler.image_size(self._undistorted_image_file(image)) def _undistorted_mask_path(self) -> str: return os.path.join(self.data_path, "masks") def _undistorted_mask_file(self, image: str) -> str: """Path of undistorted version of a mask.""" return os.path.join(self._undistorted_mask_path(), image + ".png") def undistorted_mask_exists(self, image: str) -> bool: """Check if the undistorted mask file exists.""" return self.io_handler.isfile(self._undistorted_mask_file(image)) def load_undistorted_mask(self, image: str) -> np.ndarray: """Load undistorted mask pixels as a numpy array.""" return self.io_handler.imread( self._undistorted_mask_file(image), grayscale=True ) def save_undistorted_mask(self, image: str, array: np.ndarray) -> None: """Save the undistorted image mask.""" self.io_handler.mkdir_p(self._undistorted_mask_path()) self.io_handler.imwrite(self._undistorted_mask_file(image), array) def _undistorted_segmentation_path(self) -> str: return os.path.join(self.data_path, "segmentations") def _undistorted_segmentation_file(self, image: str) -> str: """Path of undistorted version of a segmentation.""" return os.path.join(self._undistorted_segmentation_path(), image + ".png") def undistorted_segmentation_exists(self, image: str) -> bool: """Check if the undistorted segmentation file exists.""" return self.io_handler.isfile(self._undistorted_segmentation_file(image)) def load_undistorted_segmentation(self, image: str) -> np.ndarray: """Load an undistorted image segmentation.""" segmentation_file = self._undistorted_segmentation_file(image) with self.io_handler.open(segmentation_file, "rb") as fp: with PngImageFile(fp) as png_image: # TODO: We do not write a header tag in the metadata. Might be good safety check. data = np.array(png_image) if data.ndim == 2: return data elif data.ndim == 3: return data[:, :, 0] # TODO we can optionally return also the instances and scores: # instances = ( # data[:, :, 1].astype(np.int16) + data[:, :, 2].astype(np.int16) * 256 # ) # scores = data[:, :, 3].astype(np.float32) / 256.0 else: raise IndexError def save_undistorted_segmentation(self, image: str, array: np.ndarray) -> None: """Save the undistorted image segmentation.""" self.io_handler.mkdir_p(self._undistorted_segmentation_path()) self.io_handler.imwrite(self._undistorted_segmentation_file(image), array) def load_undistorted_segmentation_mask(self, image: str) -> Optional[np.ndarray]: """Build a mask from the undistorted segmentation. The mask is non-zero only for pixels with segmentation labels not in undistorted_segmentation_ignore_values. If there are no undistorted_segmentation_ignore_values in the config, the segmentation_ignore_values are used instead. """ ignore_values = self.base.undistorted_segmentation_ignore_values(image) if not ignore_values: return None segmentation = self.load_undistorted_segmentation(image) if segmentation is None: return None return masking.mask_from_segmentation(segmentation, ignore_values) def load_undistorted_combined_mask(self, image: str) -> Optional[np.ndarray]: """Combine undistorted binary mask with segmentation mask. Return a mask that is non-zero only where the binary mask and the segmentation mask are non-zero. """ mask = None if self.undistorted_mask_exists(image): mask = self.load_undistorted_mask(image) smask = None if self.undistorted_segmentation_exists(image): smask = self.load_undistorted_segmentation_mask(image) return masking.combine_masks(mask, smask) def _depthmap_path(self) -> str: return os.path.join(self.data_path, "depthmaps") def depthmap_file(self, image: str, suffix: str) -> str: """Path to the depthmap file""" return os.path.join(self._depthmap_path(), image + "." + suffix) def point_cloud_file(self, filename: str = "merged.ply") -> str: return os.path.join(self._depthmap_path(), filename) def load_point_cloud( self, filename: str = "merged.ply" ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: with self.io_handler.open(self.point_cloud_file(filename), "r") as fp: return io.point_cloud_from_ply(fp) def save_point_cloud( self, points: np.ndarray, normals: np.ndarray, colors: np.ndarray, labels: np.ndarray, filename: str = "merged.ply", ) -> None: self.io_handler.mkdir_p(self._depthmap_path()) with self.io_handler.open(self.point_cloud_file(filename), "w") as fp: io.point_cloud_to_ply(points, normals, colors, labels, fp) def raw_depthmap_exists(self, image: str) -> bool: return self.io_handler.isfile(self.depthmap_file(image, "raw.npz")) def save_raw_depthmap( self, image: str, depth: np.ndarray, plane: np.ndarray, score: np.ndarray, nghbr: np.ndarray, nghbrs: np.ndarray, ) -> None: self.io_handler.mkdir_p(self._depthmap_path()) filepath = self.depthmap_file(image, "raw.npz") with self.io_handler.open(filepath, "wb") as f: np.savez_compressed( f, depth=depth, plane=plane, score=score, nghbr=nghbr, nghbrs=nghbrs ) def load_raw_depthmap( self, image: str ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: with self.io_handler.open(self.depthmap_file(image, "raw.npz"), "rb") as f: o = np.load(f) return o["depth"], o["plane"], o["score"], o["nghbr"], o["nghbrs"] def clean_depthmap_exists(self, image: str) -> bool: return self.io_handler.isfile(self.depthmap_file(image, "clean.npz")) def save_clean_depthmap( self, image: str, depth: np.ndarray, plane: np.ndarray, score: np.ndarray ) -> None: self.io_handler.mkdir_p(self._depthmap_path()) filepath = self.depthmap_file(image, "clean.npz") with self.io_handler.open(filepath, "wb") as f: np.savez_compressed(f, depth=depth, plane=plane, score=score) def load_clean_depthmap( self, image: str ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: with self.io_handler.open(self.depthmap_file(image, "clean.npz"), "rb") as f: o = np.load(f) return o["depth"], o["plane"], o["score"] def pruned_depthmap_exists(self, image: str) -> bool: return self.io_handler.isfile(self.depthmap_file(image, "pruned.npz")) def save_pruned_depthmap( self, image: str, points: np.ndarray, normals: np.ndarray, colors: np.ndarray, labels: np.ndarray, ) -> None: self.io_handler.mkdir_p(self._depthmap_path()) filepath = self.depthmap_file(image, "pruned.npz") with self.io_handler.open(filepath, "wb") as f: np.savez_compressed( f, points=points, normals=normals, colors=colors, labels=labels, ) def load_pruned_depthmap( self, image: str ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: with self.io_handler.open(self.depthmap_file(image, "pruned.npz"), "rb") as f: o = np.load(f) return ( o["points"], o["normals"], o["colors"], o["labels"], ) def load_undistorted_tracks_manager(self) -> pymap.TracksManager: filename = os.path.join(self.data_path, "tracks.csv") with self.io_handler.open(filename, "r") as f: return pymap.TracksManager.instanciate_from_string(f.read()) def save_undistorted_tracks_manager( self, tracks_manager: pymap.TracksManager ) -> None: filename = os.path.join(self.data_path, "tracks.csv") with self.io_handler.open(filename, "w") as fw: fw.write(tracks_manager.as_string()) def load_undistorted_reconstruction(self) -> List[types.Reconstruction]: filename = os.path.join(self.data_path, "reconstruction.json") with self.io_handler.open_rt(filename) as fin: return io.reconstructions_from_json(io.json_load(fin)) def save_undistorted_reconstruction( self, reconstruction: List[types.Reconstruction] ) -> None: filename = os.path.join(self.data_path, "reconstruction.json") self.io_handler.mkdir_p(self.data_path) with self.io_handler.open_wt(filename) as fout: io.json_dump(io.reconstructions_to_json(reconstruction), fout, minify=True) def invent_reference_from_gps_and_gcp( data: DataSetBase, images: Optional[List[str]] = None ) -> geo.TopocentricConverter: lat, lon, alt = 0.0, 0.0, 0.0 wlat, wlon, walt = 0.0, 0.0, 0.0 if images is None: images = data.images() for image in images: d = data.load_exif(image) if "gps" in d and "latitude" in d["gps"] and "longitude" in d["gps"]: w = 1.0 / max(0.01, d["gps"].get("dop", 15)) lat += w * d["gps"]["latitude"] lon += w * d["gps"]["longitude"] wlat += w wlon += w if "altitude" in d["gps"]: alt += w * d["gps"]["altitude"] walt += w if not wlat and not wlon: for gcp in data.load_ground_control_points(): lat += gcp.lla["latitude"] lon += gcp.lla["longitude"] wlat += 1 wlon += 1 if gcp.has_altitude: alt += gcp.lla["altitude"] walt += 1 if wlat: lat /= wlat if wlon: lon /= wlon if walt: alt /= walt return geo.TopocentricConverter(lat, lon, 0) # Set altitude manually.