opensfm/features_processing.py (255 lines of code) (raw):

import itertools import logging import math import queue import threading from timeit import default_timer as timer from typing import Optional, List, Dict, Any, Tuple import numpy as np from opensfm import bow, features, io, log, pygeometry, upright, masking from opensfm.context import parallel_map from opensfm.dataset_base import DataSetBase logger = logging.getLogger(__name__) def run_features_processing(data: DataSetBase, images: List[str], force: bool) -> None: """Main entry point for running features extraction on a list of images.""" default_queue_size = 10 max_queue_size = 200 mem_available = log.memory_available() processes = data.config["processes"] if mem_available: # Use 90% of available memory ratio_use = 0.9 mem_available *= ratio_use logger.info( f"Planning to use {mem_available} MB of RAM for both processing queue and parallel processing." ) # 50% for the queue / 50% for parralel processing expected_mb = mem_available / 2 expected_images = min( max_queue_size, int(expected_mb / average_image_size(data)) ) processing_size = average_processing_size(data) logger.info( f"Scale-space expected size of a single image : {processing_size} MB" ) processes = min(max(1, int(expected_mb / processing_size)), processes) else: expected_images = default_queue_size logger.info( f"Expecting to queue at most {expected_images} images while parallel processing of {processes} images." ) process_queue = queue.Queue(expected_images) arguments: List[Tuple[str, Any]] = [] if processes == 1: for image in images: counter = Counter() read_images(process_queue, data, [image], counter, 1, force) run_detection(process_queue) process_queue.get() else: counter = Counter() read_processes = data.config["read_processes"] if 1.5 * read_processes >= processes: read_processes = max(1, processes // 2) chunk_size = math.ceil(len(images) / read_processes) chunks_count = math.ceil(len(images) / chunk_size) read_processes = min(read_processes, chunks_count) expected: int = len(images) for i in range(read_processes): images_chunk = images[i * chunk_size : (i + 1) * chunk_size] arguments.append( ( "producer", (process_queue, data, images_chunk, counter, expected, force), ) ) for _ in range(processes): arguments.append(("consumer", (process_queue))) parallel_map(process, arguments, processes, 1) def average_image_size(data: DataSetBase) -> float: average_size_mb = 0 for camera in data.load_camera_models().values(): average_size_mb += camera.width * camera.height * 4 / 1024 / 1024 return average_size_mb / max(1, len(data.load_camera_models())) def average_processing_size(data: DataSetBase) -> float: processing_size = data.config["feature_process_size"] min_octave_size = 16 # from covdet.c octaveResolution = 3 # from covdet.c start_size = processing_size * processing_size * 4 / 1024 / 1024 last_octave = math.floor(math.log2(processing_size / min_octave_size)) total_size = 0 for _ in range(last_octave + 1): total_size += start_size * octaveResolution start_size /= 2 return total_size def is_high_res_panorama( data: DataSetBase, image_key: str, image_array: np.ndarray ) -> bool: """Detect if image is a panorama.""" exif = data.load_exif(image_key) if exif: camera = data.load_camera_models()[exif["camera"]] w, h = int(exif["width"]), int(exif["height"]) exif_pano = pygeometry.Camera.is_panorama(camera.projection_type) elif image_array is not None: h, w = image_array.shape[:2] exif_pano = False else: return False return w == 2 * h or exif_pano class Counter(object): """Lock-less counter from https://julien.danjou.info/atomic-lock-free-counters-in-python/ that relies on the CPython impl. of itertools.count() that is thread-safe. Used, as for some reason, joblib doesn't like a good old threading.Lock (everything is stuck) """ def __init__(self): self.number_of_read = 0 self.counter = itertools.count() self.read_lock = threading.Lock() def increment(self): next(self.counter) def value(self): with self.read_lock: value = next(self.counter) - self.number_of_read self.number_of_read += 1 return value def process(args: Tuple[str, Any]) -> None: process_type, real_args = args if process_type == "producer": queue, data, images, counter, expected, force = real_args read_images(queue, data, images, counter, expected, force) if process_type == "consumer": queue = real_args run_detection(queue) def read_images( queue: queue.Queue, data: DataSetBase, images: List[str], counter: Counter, expected: int, force: bool, ) -> None: full_queue_timeout = 120 for image in images: logger.info(f"Reading data for image {image} (queue-size={queue.qsize()}") image_array = data.load_image(image) if data.config["features_bake_segmentation"]: segmentation_array = data.load_segmentation(image) instances_array = data.load_instances(image) else: segmentation_array, instances_array = None, None args = image, image_array, segmentation_array, instances_array, data, force queue.put(args, block=True, timeout=full_queue_timeout) counter.increment() if counter.value() == expected: logger.info("Finished reading images") queue.put(None) def run_detection(queue: queue.Queue): while True: args = queue.get() if args is None: queue.put(None) break image, image_array, segmentation_array, instances_array, data, force = args detect(image, image_array, segmentation_array, instances_array, data, force) del image_array del segmentation_array del instances_array def bake_segmentation( image: np.ndarray, points: np.ndarray, segmentation: Optional[np.ndarray], instances: Optional[np.ndarray], exif: Dict[str, Any], ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: exif_height, exif_width, exif_orientation = ( exif["height"], exif["width"], exif["orientation"], ) height, width = image.shape[:2] if exif_height != height or exif_width != width: logger.error( f"Image has inconsistent EXIF dimensions ({exif_width}, {exif_height}) and image dimensions ({width}, {height}). Orientation={exif_orientation}" ) panoptic_data = [None, None] for i, p_data in enumerate([segmentation, instances]): if p_data is None: continue new_height, new_width = p_data.shape ps = upright.opensfm_to_upright( points[:, :2], width, height, exif["orientation"], new_width=new_width, new_height=new_height, ).astype(int) panoptic_data[i] = p_data[ps[:, 1], ps[:, 0]] return tuple(panoptic_data) def detect( image: str, image_array: np.ndarray, segmentation_array: Optional[np.ndarray], instances_array: Optional[np.ndarray], data: DataSetBase, force: bool = False, ) -> None: log.setup() need_words = ( data.config["matcher_type"] == "WORDS" or data.config["matching_bow_neighbors"] > 0 ) has_words = not need_words or data.words_exist(image) has_features = data.features_exist(image) if not force and has_features and has_words: logger.info( "Skip recomputing {} features for image {}".format( data.feature_type().upper(), image ) ) return logger.info( "Extracting {} features for image {}".format(data.feature_type().upper(), image) ) start = timer() p_unmasked, f_unmasked, c_unmasked = features.extract_features( image_array, data.config, is_high_res_panorama(data, image, image_array) ) # Load segmentation and bake it in the data if data.config["features_bake_segmentation"]: exif = data.load_exif(image) s_unsorted, i_unsorted = bake_segmentation( image_array, p_unmasked, segmentation_array, instances_array, exif ) p_unsorted = p_unmasked f_unsorted = f_unmasked c_unsorted = c_unmasked # Load segmentation, make a mask from it mask and apply it else: s_unsorted, i_unsorted = None, None fmask = masking.load_features_mask(data, image, p_unmasked) p_unsorted = p_unmasked[fmask] f_unsorted = f_unmasked[fmask] c_unsorted = c_unmasked[fmask] if len(p_unsorted) == 0: logger.warning("No features found in image {}".format(image)) size = p_unsorted[:, 2] order = np.argsort(size) p_sorted = p_unsorted[order, :] f_sorted = f_unsorted[order, :] c_sorted = c_unsorted[order, :] if s_unsorted is not None: semantic_data = features.SemanticData( s_unsorted[order], i_unsorted[order] if i_unsorted is not None else None, data.segmentation_labels(), ) else: semantic_data = None features_data = features.FeaturesData(p_sorted, f_sorted, c_sorted, semantic_data) data.save_features(image, features_data) if need_words: bows = bow.load_bows(data.config) n_closest = data.config["bow_words_to_match"] closest_words = bows.map_to_words( f_sorted, n_closest, data.config["bow_matcher_type"] ) data.save_words(image, closest_words) end = timer() report = { "image": image, "num_features": len(p_sorted), "wall_time": end - start, } data.save_report(io.json_dumps(report), "features/{}.json".format(image))