opensfm/feature_loading.py (181 lines of code) (raw):
import logging
from functools import lru_cache
from typing import Optional, Tuple, Any
import numpy as np
from opensfm import pygeometry, features as ft, masking
from opensfm.dataset_base import DataSetBase
logger = logging.getLogger(__name__)
SEGMENTATION_IN_DESCRIPTOR_MULT = (
35 # determined experimentally for HAHOG UCHAR type descriptors
)
class FeatureLoader(object):
def clear_cache(self) -> None:
self.load_mask.cache_clear()
self.load_points_colors_segmentations_instances.cache_clear()
self._load_all_data_unmasked.cache_clear()
self._load_all_data_masked.cache_clear()
self.load_features_index.cache_clear()
self.load_words.cache_clear()
@lru_cache(1000)
def load_mask(self, data: DataSetBase, image: str) -> Optional[np.ndarray]:
all_features_data = self._load_all_data_unmasked(data, image)
if not all_features_data:
return None
if (
data.config["features_bake_segmentation"]
and all_features_data.semantic is not None
):
# feature mask for baked segmentation
segmentations = all_features_data.semantic.segmentation
ignore_values = set(data.segmentation_ignore_values(image))
smask = np.array(
[
False if segmentations[i] in ignore_values else True
for i in range(len(segmentations))
],
dtype=bool,
)
# combine with 'classic' mask if any
mask_image = data.load_mask(image)
if mask_image is not None:
mask = masking.load_features_mask(
data, image, all_features_data.points[:, :2], mask_image
)
smask &= mask
n_removed = np.sum(smask == 0)
logger.debug(
"Masking {} / {} ({:.2f}) features for {}".format(
n_removed, len(smask), n_removed / len(smask), image
)
)
return smask
else:
return masking.load_features_mask(
data, image, all_features_data.points[:, :2]
)
@lru_cache(1000)
def load_points_colors_segmentations_instances(
self, data: DataSetBase, image: str
) -> Optional[ft.FeaturesData]:
all_features_data = self._load_features_nocache(data, image)
if not all_features_data:
return None
return ft.FeaturesData(
all_features_data.points,
None,
all_features_data.colors,
all_features_data.semantic,
)
@lru_cache(2000)
def load_bearings(
self,
data: DataSetBase,
image: str,
masked: bool,
camera: pygeometry.Camera,
) -> Optional[np.ndarray]:
if masked:
features_data = self._load_all_data_masked(data, image)
else:
features_data = self._load_all_data_unmasked(data, image)
if not features_data:
return None
keypoints_2d = np.array(features_data.points[:, :2], dtype=float)
bearings_3d = camera.pixel_bearing_many(keypoints_2d)
return bearings_3d
def load_all_data(
self,
data: DataSetBase,
image: str,
masked: bool,
segmentation_in_descriptor: bool,
) -> Optional[ft.FeaturesData]:
if masked:
features_data = self._load_all_data_masked(data, image)
else:
features_data = self._load_all_data_unmasked(data, image)
if not features_data:
return None
if segmentation_in_descriptor:
return self._add_segmentation_in_descriptor(data, features_data)
else:
return features_data
def _add_segmentation_in_descriptor(
self, data: DataSetBase, features: ft.FeaturesData
) -> ft.FeaturesData:
if (
not data.config["hahog_normalize_to_uchar"]
or data.config["feature_type"] != "HAHOG"
):
raise RuntimeError(
"Semantic segmentation in descriptor only supported for HAHOG UCHAR descriptors"
)
segmentation = features.get_segmentation()
if segmentation is None:
return features
desc_augmented = np.concatenate(
(
features.descriptors,
(np.array([segmentation]).T).astype(np.float32),
),
axis=1,
)
desc_augmented[:, -1] *= SEGMENTATION_IN_DESCRIPTOR_MULT
return ft.FeaturesData(
features.points,
desc_augmented,
features.colors,
features.semantic,
)
@lru_cache(20)
def _load_all_data_unmasked(
self, data: DataSetBase, image: str
) -> Optional[ft.FeaturesData]:
return self._load_features_nocache(data, image)
@lru_cache(200)
def _load_all_data_masked(
self, data: DataSetBase, image: str
) -> Optional[ft.FeaturesData]:
features_data = self._load_all_data_unmasked(data, image)
if not features_data:
return features_data
mask = self.load_mask(data, image)
if mask is not None:
return features_data.mask(mask)
return features_data
@lru_cache(200)
def load_features_index(
self,
data: DataSetBase,
image: str,
masked: bool,
segmentation_in_descriptor: bool,
) -> Optional[Tuple[ft.FeaturesData, Any]]:
features_data = self.load_all_data(
data, image, masked, segmentation_in_descriptor
)
if not features_data:
return None
descriptors = features_data.descriptors
if descriptors is None:
return None
return features_data, ft.build_flann_index(
descriptors,
data.config,
)
@lru_cache(200)
def load_words(self, data: DataSetBase, image: str, masked: bool) -> np.ndarray:
words = data.load_words(image)
if masked:
mask = self.load_mask(data, image)
if mask is not None:
words = words[mask]
return words
def _load_features_nocache(
self, data: DataSetBase, image: str
) -> Optional[ft.FeaturesData]:
features_data = data.load_features(image)
if features_data is None:
logger.error("Could not load features for image {}".format(image))
return None
else:
features_data.points = np.array(features_data.points[:, :3], dtype=float)
return features_data