opensfm/context.py (89 lines of code) (raw):

import logging import os try: import resource except ModuleNotFoundError: pass # Windows import ctypes import sys from typing import Optional import cv2 from joblib import Parallel, delayed, parallel_backend logger = logging.getLogger(__name__) abspath = os.path.dirname(os.path.realpath(__file__)) SENSOR_DATA = os.path.join(abspath, "data", "sensor_data.json") CAMERA_CALIBRATION = os.path.join(abspath, "data", "camera_calibration.yaml") BOW_PATH = os.path.join(abspath, "data", "bow") # Handle different OpenCV versions OPENCV5 = int(cv2.__version__.split(".")[0]) >= 5 OPENCV4 = int(cv2.__version__.split(".")[0]) >= 4 OPENCV44 = ( int(cv2.__version__.split(".")[0]) == 4 and int(cv2.__version__.split(".")[1]) >= 4 ) OPENCV3 = int(cv2.__version__.split(".")[0]) >= 3 if hasattr(cv2, "flann_Index"): flann_Index = cv2.flann_Index elif hasattr(cv2, "flann") and hasattr(cv2.flann, "Index"): flann_Index = cv2.flann.Index else: logger.warning("Unable to find flann Index") flann_Index = None # Parallel processes def parallel_map(func, args, num_proc, max_batch_size=1): """Run function for all arguments using multiple processes.""" # De-activate/Restore any inner OpenCV threading threads_used = cv2.getNumThreads() cv2.setNumThreads(0) num_proc = min(num_proc, len(args)) if num_proc <= 1: res = list(map(func, args)) else: with parallel_backend("threading", n_jobs=num_proc): batch_size = max(1, int(len(args) / (num_proc * 2))) batch_size = ( min(batch_size, max_batch_size) if max_batch_size else batch_size ) res = Parallel(batch_size=batch_size)(delayed(func)(arg) for arg in args) cv2.setNumThreads(threads_used) return res # Memory usage if sys.platform == "win32": class MEMORYSTATUSEX(ctypes.Structure): _fields_ = [ ("dwLength", ctypes.c_ulong), ("dwMemoryLoad", ctypes.c_ulong), ("ullTotalPhys", ctypes.c_ulonglong), ("ullAvailPhys", ctypes.c_ulonglong), ("ullTotalPageFile", ctypes.c_ulonglong), ("ullAvailPageFile", ctypes.c_ulonglong), ("ullTotalVirtual", ctypes.c_ulonglong), ("ullAvailVirtual", ctypes.c_ulonglong), ("sullAvailExtendedVirtual", ctypes.c_ulonglong), ] def __init__(self): # have to initialize this to the size of MEMORYSTATUSEX self.dwLength = ctypes.sizeof(self) super(MEMORYSTATUSEX, self).__init__() def memory_available() -> Optional[int]: """Available memory in MB. Only works on Windows """ stat = MEMORYSTATUSEX() ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) return stat.ullAvailPhys / 1024 / 1024 def current_memory_usage(): stat = MEMORYSTATUSEX() ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) return (stat.ullTotalPhys - stat.ullAvailPhys) / 1024 else: if sys.platform == "darwin": rusage_unit = 1 else: rusage_unit = 1024 def memory_available() -> Optional[int]: """Available memory in MB. Only works on linux and returns None otherwise. """ with os.popen("free -t -m") as fp: lines = fp.readlines() if not lines: return None available_mem = int(lines[1].split()[6]) return available_mem def current_memory_usage(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * rusage_unit def processes_that_fit_in_memory(desired: int, per_process: int) -> int: """Amount of parallel BoW process that fit in memory.""" available_mem = memory_available() if available_mem is not None: fittable = max(1, int(available_mem / per_process)) return min(desired, fittable) else: return desired