in opensfm/features_processing.py [0:0]
def run_features_processing(data: DataSetBase, images: List[str], force: bool) -> None:
"""Main entry point for running features extraction on a list of images."""
default_queue_size = 10
max_queue_size = 200
mem_available = log.memory_available()
processes = data.config["processes"]
if mem_available:
# Use 90% of available memory
ratio_use = 0.9
mem_available *= ratio_use
logger.info(
f"Planning to use {mem_available} MB of RAM for both processing queue and parallel processing."
)
# 50% for the queue / 50% for parralel processing
expected_mb = mem_available / 2
expected_images = min(
max_queue_size, int(expected_mb / average_image_size(data))
)
processing_size = average_processing_size(data)
logger.info(
f"Scale-space expected size of a single image : {processing_size} MB"
)
processes = min(max(1, int(expected_mb / processing_size)), processes)
else:
expected_images = default_queue_size
logger.info(
f"Expecting to queue at most {expected_images} images while parallel processing of {processes} images."
)
process_queue = queue.Queue(expected_images)
arguments: List[Tuple[str, Any]] = []
if processes == 1:
for image in images:
counter = Counter()
read_images(process_queue, data, [image], counter, 1, force)
run_detection(process_queue)
process_queue.get()
else:
counter = Counter()
read_processes = data.config["read_processes"]
if 1.5 * read_processes >= processes:
read_processes = max(1, processes // 2)
chunk_size = math.ceil(len(images) / read_processes)
chunks_count = math.ceil(len(images) / chunk_size)
read_processes = min(read_processes, chunks_count)
expected: int = len(images)
for i in range(read_processes):
images_chunk = images[i * chunk_size : (i + 1) * chunk_size]
arguments.append(
(
"producer",
(process_queue, data, images_chunk, counter, expected, force),
)
)
for _ in range(processes):
arguments.append(("consumer", (process_queue)))
parallel_map(process, arguments, processes, 1)