opensfm/features.py (476 lines of code) (raw):
"""Tools to extract features."""
import logging
import time
from typing import Tuple, Dict, Any, List, Optional
import cv2
import numpy as np
from opensfm import context, pyfeatures
logger = logging.getLogger(__name__)
class SemanticData:
segmentation: np.ndarray
instances: Optional[np.ndarray]
labels: List[Dict[str, Any]]
def __init__(
self,
segmentation: np.ndarray,
instances: Optional[np.ndarray],
labels: List[Dict[str, Any]],
):
self.segmentation = segmentation
self.instances = instances
self.labels = labels
def has_instances(self) -> bool:
return self.instances is not None
def mask(self, mask: np.ndarray) -> "SemanticData":
try:
segmentation = self.segmentation[mask]
instances = self.instances
if instances is not None:
instances = instances[mask]
except IndexError:
logger.error(
f"Invalid mask array of dtype {mask.dtype}, shape {mask.shape}: {mask}"
)
raise
return SemanticData(segmentation, instances, self.labels)
class FeaturesData:
points: np.ndarray
descriptors: Optional[np.ndarray]
colors: np.ndarray
semantic: Optional[SemanticData]
FEATURES_VERSION: int = 2
FEATURES_HEADER: str = "OPENSFM_FEATURES_VERSION"
def __init__(
self,
points: np.ndarray,
descriptors: Optional[np.ndarray],
colors: np.ndarray,
semantic: Optional[SemanticData],
):
self.points = points
self.descriptors = descriptors
self.colors = colors
self.semantic = semantic
def get_segmentation(self) -> Optional[np.ndarray]:
semantic = self.semantic
if not semantic:
return None
if semantic.segmentation is not None:
return semantic.segmentation
return None
def has_instances(self) -> bool:
semantic = self.semantic
if not semantic:
return False
return semantic.instances is not None
def mask(self, mask: np.ndarray) -> "FeaturesData":
if self.semantic:
masked_semantic = self.semantic.mask(mask)
else:
masked_semantic = None
return FeaturesData(
self.points[mask],
self.descriptors[mask] if self.descriptors is not None else None,
self.colors[mask],
masked_semantic,
)
def save(self, fileobject: Any, config: Dict[str, Any]):
"""Save features from file (path like or file object like)"""
feature_type = config["feature_type"]
if (
(
feature_type == "AKAZE"
and config["akaze_descriptor"] in ["MLDB_UPRIGHT", "MLDB"]
)
or (feature_type == "HAHOG" and config["hahog_normalize_to_uchar"])
or (feature_type == "ORB")
):
feature_data_type = np.uint8
else:
feature_data_type = np.float32
descriptors = self.descriptors
if descriptors is None:
raise RuntimeError("No descriptors found, canot save features data.")
semantic = self.semantic
if semantic:
np.savez_compressed(
fileobject,
points=self.points.astype(np.float32),
descriptors=descriptors.astype(feature_data_type),
colors=self.colors,
segmentations=semantic.segmentation,
instances=semantic.instances,
segmentation_labels=semantic.labels,
OPENSFM_FEATURES_VERSION=self.FEATURES_VERSION,
allow_pickle=True,
)
else:
np.savez_compressed(
fileobject,
points=self.points.astype(np.float32),
descriptors=descriptors.astype(feature_data_type),
colors=self.colors,
segmentations=None,
instances=None,
segmentation_labels=None,
OPENSFM_FEATURES_VERSION=self.FEATURES_VERSION,
allow_pickle=True,
)
@classmethod
def from_file(cls, fileobject: Any, config: Dict[str, Any]) -> "FeaturesData":
"""Load features from file (path like or file object like)"""
s = np.load(fileobject, allow_pickle=True)
version = cls._features_file_version(s)
return getattr(cls, "_from_file_v%d" % version)(s, config)
@classmethod
def _features_file_version(cls, obj: Dict[str, Any]) -> int:
"""Retrieve features file version. Return 0 if none"""
if cls.FEATURES_HEADER in obj:
return obj[cls.FEATURES_HEADER]
else:
return 0
@classmethod
def _from_file_v0(
cls, data: Dict[str, np.ndarray], config: Dict[str, Any]
) -> "FeaturesData":
"""Base version of features file
Scale (desc[2]) set to reprojection_error_sd by default (legacy behaviour)
"""
feature_type = config["feature_type"]
if feature_type == "HAHOG" and config["hahog_normalize_to_uchar"]:
descriptors = data["descriptors"].astype(np.float32)
else:
descriptors = data["descriptors"]
points = data["points"]
points[:, 2:3] = config["reprojection_error_sd"]
return FeaturesData(points, descriptors, data["colors"].astype(float), None)
@classmethod
def _from_file_v1(
cls, data: Dict[str, np.ndarray], config: Dict[str, Any]
) -> "FeaturesData":
"""Version 1 of features file
Scale is not properly set higher in the pipeline, default is gone.
"""
feature_type = config["feature_type"]
if feature_type == "HAHOG" and config["hahog_normalize_to_uchar"]:
descriptors = data["descriptors"].astype(np.float32)
else:
descriptors = data["descriptors"]
return FeaturesData(
data["points"], descriptors, data["colors"].astype(float), None
)
@classmethod
def _from_file_v2(
cls,
data: Dict[str, Any],
config: Dict[str, Any],
) -> "FeaturesData":
"""Version 2 of features file
Added segmentation and segmentation labels.
"""
feature_type = config["feature_type"]
if feature_type == "HAHOG" and config["hahog_normalize_to_uchar"]:
descriptors = data["descriptors"].astype(np.float32)
else:
descriptors = data["descriptors"]
has_segmentation = (data["segmentations"] != None).all()
has_instances = (data["instances"] != None).all()
if has_segmentation or has_instances:
semantic_data = SemanticData(
data["segmentations"] if has_segmentation else None,
data["instances"] if has_instances else None,
data["segmentation_labels"],
)
else:
semantic_data = None
return FeaturesData(
data["points"], descriptors, data["colors"].astype(float), semantic_data
)
def resized_image(image: np.ndarray, max_size: int) -> np.ndarray:
"""Resize image to feature_process_size."""
h, w = image.shape[:2]
size = max(w, h)
if 0 < max_size < size:
dsize = w * max_size // size, h * max_size // size
return cv2.resize(image, dsize=dsize, interpolation=cv2.INTER_AREA)
else:
return image
def root_feature(desc: np.ndarray, l2_normalization: bool = False) -> np.ndarray:
if l2_normalization:
s2 = np.linalg.norm(desc, axis=1)
desc = (desc.T / s2).T
s = np.sum(desc, 1)
desc = np.sqrt(desc.T / s).T
return desc
def root_feature_surf(
desc: np.ndarray, l2_normalization: bool = False, partial: bool = False
) -> np.ndarray:
"""
Experimental square root mapping of surf-like feature, only work for 64-dim surf now
"""
if desc.shape[1] == 64:
if l2_normalization:
s2 = np.linalg.norm(desc, axis=1)
desc = (desc.T / s2).T
if partial:
ii = np.array([i for i in range(64) if (i % 4 == 2 or i % 4 == 3)])
else:
ii = np.arange(64)
desc_sub = np.abs(desc[:, ii])
desc_sub_sign = np.sign(desc[:, ii])
# s_sub = np.sum(desc_sub, 1) # This partial normalization gives slightly better results for AKAZE surf
s_sub = np.sum(np.abs(desc), 1)
desc_sub = np.sqrt(desc_sub.T / s_sub).T
desc[:, ii] = desc_sub * desc_sub_sign
return desc
def normalized_image_coordinates(
pixel_coords: np.ndarray, width: int, height: int
) -> np.ndarray:
size = max(width, height)
p = np.empty((len(pixel_coords), 2))
p[:, 0] = (pixel_coords[:, 0] + 0.5 - width / 2.0) / size
p[:, 1] = (pixel_coords[:, 1] + 0.5 - height / 2.0) / size
return p
def denormalized_image_coordinates(
norm_coords: np.ndarray, width: int, height: int
) -> np.ndarray:
size = max(width, height)
p = np.empty((len(norm_coords), 2))
p[:, 0] = norm_coords[:, 0] * size - 0.5 + width / 2.0
p[:, 1] = norm_coords[:, 1] * size - 0.5 + height / 2.0
return p
def normalize_features(
points: np.ndarray, desc: np.ndarray, colors: np.ndarray, width: int, height: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray,]:
"""Normalize feature coordinates and size."""
points[:, :2] = normalized_image_coordinates(points[:, :2], width, height)
points[:, 2:3] /= max(width, height)
return points, desc, colors
def _in_mask(point: np.ndarray, width: int, height: int, mask: np.ndarray) -> bool:
"""Check if a point is inside a binary mask."""
u = mask.shape[1] * (point[0] + 0.5) / width
v = mask.shape[0] * (point[1] + 0.5) / height
return mask[int(v), int(u)] != 0
def extract_features_sift(
image: np.ndarray, config: Dict[str, Any], features_count: int
) -> Tuple[np.ndarray, np.ndarray]:
sift_edge_threshold = config["sift_edge_threshold"]
sift_peak_threshold = float(config["sift_peak_threshold"])
# SIFT support is in cv2 main from version 4.4.0
if context.OPENCV44 or context.OPENCV5:
# OpenCV versions concerned /** 3.4.11, >= 4.4.0 **/ ==> Sift became free since March 2020
detector = cv2.SIFT_create(
edgeThreshold=sift_edge_threshold, contrastThreshold=sift_peak_threshold
)
descriptor = detector
elif context.OPENCV3 or context.OPENCV4:
try:
# OpenCV versions concerned /** 3.2.x, 3.3.x, 3.4.0, 3.4.1, 3.4.2, 3.4.10, 4.3.0, 4.4.0 **/
detector = cv2.xfeatures2d.SIFT_create(
edgeThreshold=sift_edge_threshold, contrastThreshold=sift_peak_threshold
)
except AttributeError as ae:
# OpenCV versions concerned /** 3.4.3, 3.4.4, 3.4.5, 3.4.6, 3.4.7, 3.4.8, 3.4.9, 4.0.x, 4.1.x, 4.2.x **/
if "no attribute 'xfeatures2d'" in str(ae):
logger.error(
"OpenCV Contrib modules are required to extract SIFT features"
)
raise
descriptor = detector
else:
detector = cv2.FeatureDetector_create("SIFT")
descriptor = cv2.DescriptorExtractor_create("SIFT")
detector.setDouble("edgeThreshold", sift_edge_threshold)
while True:
logger.debug("Computing sift with threshold {0}".format(sift_peak_threshold))
t = time.time()
# SIFT support is in cv2 main from version 4.4.0
if context.OPENCV44 or context.OPENCV5:
detector = cv2.SIFT_create(
edgeThreshold=sift_edge_threshold, contrastThreshold=sift_peak_threshold
)
elif context.OPENCV3:
detector = cv2.xfeatures2d.SIFT_create(
edgeThreshold=sift_edge_threshold, contrastThreshold=sift_peak_threshold
)
else:
detector.setDouble("contrastThreshold", sift_peak_threshold)
points = detector.detect(image)
logger.debug("Found {0} points in {1}s".format(len(points), time.time() - t))
if len(points) < features_count and sift_peak_threshold > 0.0001:
sift_peak_threshold = (sift_peak_threshold * 2) / 3
logger.debug("reducing threshold")
else:
logger.debug("done")
break
points, desc = descriptor.compute(image, points)
if desc is not None:
if config["feature_root"]:
desc = root_feature(desc)
points = np.array([(i.pt[0], i.pt[1], i.size, i.angle) for i in points])
else:
points = np.array(np.zeros((0, 3)))
desc = np.array(np.zeros((0, 3)))
return points, desc
def extract_features_surf(
image: np.ndarray, config: Dict[str, Any], features_count: int
) -> Tuple[np.ndarray, np.ndarray]:
surf_hessian_threshold = config["surf_hessian_threshold"]
if context.OPENCV3:
try:
detector = cv2.xfeatures2d.SURF_create()
except AttributeError as ae:
if "no attribute 'xfeatures2d'" in str(ae):
logger.error(
"OpenCV Contrib modules are required to extract SURF features"
)
raise
descriptor = detector
detector.setHessianThreshold(surf_hessian_threshold)
detector.setNOctaves(config["surf_n_octaves"])
detector.setNOctaveLayers(config["surf_n_octavelayers"])
detector.setUpright(config["surf_upright"])
else:
detector = cv2.FeatureDetector_create("SURF")
descriptor = cv2.DescriptorExtractor_create("SURF")
detector.setDouble("hessianThreshold", surf_hessian_threshold)
detector.setDouble("nOctaves", config["surf_n_octaves"])
detector.setDouble("nOctaveLayers", config["surf_n_octavelayers"])
detector.setInt("upright", config["surf_upright"])
while True:
logger.debug("Computing surf with threshold {0}".format(surf_hessian_threshold))
t = time.time()
if context.OPENCV3:
detector.setHessianThreshold(surf_hessian_threshold)
else:
detector.setDouble(
"hessianThreshold", surf_hessian_threshold
) # default: 0.04
points = detector.detect(image)
logger.debug("Found {0} points in {1}s".format(len(points), time.time() - t))
if len(points) < features_count and surf_hessian_threshold > 0.0001:
surf_hessian_threshold = (surf_hessian_threshold * 2) / 3
logger.debug("reducing threshold")
else:
logger.debug("done")
break
points, desc = descriptor.compute(image, points)
if desc is not None:
if config["feature_root"]:
desc = root_feature(desc)
points = np.array([(i.pt[0], i.pt[1], i.size, i.angle) for i in points])
else:
points = np.array(np.zeros((0, 3)))
desc = np.array(np.zeros((0, 3)))
return points, desc
def akaze_descriptor_type(name: str) -> pyfeatures.AkazeDescriptorType:
d = pyfeatures.AkazeDescriptorType.__dict__
if name in d:
return d[name]
else:
logger.debug("Wrong akaze descriptor type")
return d["MSURF"]
def extract_features_akaze(
image: np.ndarray, config: Dict[str, Any], features_count: int
) -> Tuple[np.ndarray, np.ndarray]:
options = pyfeatures.AKAZEOptions()
options.omax = config["akaze_omax"]
akaze_descriptor_name = config["akaze_descriptor"]
options.descriptor = akaze_descriptor_type(akaze_descriptor_name)
options.descriptor_size = config["akaze_descriptor_size"]
options.descriptor_channels = config["akaze_descriptor_channels"]
options.dthreshold = config["akaze_dthreshold"]
options.kcontrast_percentile = config["akaze_kcontrast_percentile"]
options.use_isotropic_diffusion = config["akaze_use_isotropic_diffusion"]
options.target_num_features = features_count
options.use_adaptive_suppression = config["feature_use_adaptive_suppression"]
logger.debug("Computing AKAZE with threshold {0}".format(options.dthreshold))
t = time.time()
points, desc = pyfeatures.akaze(image, options)
logger.debug("Found {0} points in {1}s".format(len(points), time.time() - t))
if config["feature_root"]:
if akaze_descriptor_name in ["SURF_UPRIGHT", "MSURF_UPRIGHT"]:
desc = root_feature_surf(desc, partial=True)
elif akaze_descriptor_name in ["SURF", "MSURF"]:
desc = root_feature_surf(desc, partial=False)
points = points.astype(float)
return points, desc
def extract_features_hahog(
image: np.ndarray, config: Dict[str, Any], features_count: int
) -> Tuple[np.ndarray, np.ndarray]:
t = time.time()
points, desc = pyfeatures.hahog(
image.astype(np.float32) / 255, # VlFeat expects pixel values between 0, 1
peak_threshold=config["hahog_peak_threshold"],
edge_threshold=config["hahog_edge_threshold"],
target_num_features=features_count,
)
if config["feature_root"]:
desc = np.sqrt(desc)
uchar_scaling = 362 # x * 512 < 256 => sqrt(x) * 362 < 256
else:
uchar_scaling = 512
if config["hahog_normalize_to_uchar"]:
desc = (uchar_scaling * desc).clip(0, 255).round()
logger.debug("Found {0} points in {1}s".format(len(points), time.time() - t))
return points, desc
def extract_features_orb(
image: np.ndarray, config: Dict[str, Any], features_count: int
) -> Tuple[np.ndarray, np.ndarray]:
if context.OPENCV3:
detector = cv2.ORB_create(nfeatures=features_count)
descriptor = detector
else:
detector = cv2.FeatureDetector_create("ORB")
descriptor = cv2.DescriptorExtractor_create("ORB")
detector.setDouble("nFeatures", features_count)
logger.debug("Computing ORB")
t = time.time()
points = detector.detect(image)
points, desc = descriptor.compute(image, points)
if desc is not None:
points = np.array([(i.pt[0], i.pt[1], i.size, i.angle) for i in points])
else:
points = np.array(np.zeros((0, 3)))
desc = np.array(np.zeros((0, 3)))
logger.debug("Found {0} points in {1}s".format(len(points), time.time() - t))
return points, desc
def extract_features(
image: np.ndarray, config: Dict[str, Any], is_panorama: bool
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Detect features in a color or gray-scale image.
The type of feature detected is determined by the ``feature_type``
config option.
The coordinates of the detected points are returned in normalized
image coordinates.
Parameters:
- image: a color image with shape (h, w, 3) or
gray-scale image with (h, w) or (h, w, 1)
- config: the configuration structure
- is_panorama : if True, alternate settings are used for feature count and extraction size.
Returns:
tuple:
- points: ``x``, ``y``, ``size`` and ``angle`` for each feature
- descriptors: the descriptor of each feature
- colors: the color of the center of each feature
"""
extraction_size = (
config["feature_process_size_panorama"]
if is_panorama
else config["feature_process_size"]
)
features_count = (
config["feature_min_frames_panorama"]
if is_panorama
else config["feature_min_frames"]
)
assert len(image.shape) == 3 or len(image.shape) == 2
image = resized_image(image, extraction_size)
if len(image.shape) == 2: # convert (h, w) to (h, w, 1)
image = np.expand_dims(image, axis=2)
# convert color to gray-scale if necessary
if image.shape[2] == 3:
image_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
image_gray = image
feature_type = config["feature_type"].upper()
if feature_type == "SIFT":
points, desc = extract_features_sift(image_gray, config, features_count)
elif feature_type == "SURF":
points, desc = extract_features_surf(image_gray, config, features_count)
elif feature_type == "AKAZE":
points, desc = extract_features_akaze(image_gray, config, features_count)
elif feature_type == "HAHOG":
points, desc = extract_features_hahog(image_gray, config, features_count)
elif feature_type == "ORB":
points, desc = extract_features_orb(image_gray, config, features_count)
else:
raise ValueError(
"Unknown feature type " "(must be SURF, SIFT, AKAZE, HAHOG or ORB)"
)
xs = points[:, 0].round().astype(int)
ys = points[:, 1].round().astype(int)
colors = image[ys, xs]
if image.shape[2] == 1:
colors = np.repeat(colors, 3).reshape((-1, 3))
return normalize_features(points, desc, colors, image.shape[1], image.shape[0])
def build_flann_index(descriptors: np.ndarray, config: Dict[str, Any]) -> Any:
# FLANN_INDEX_LINEAR = 0
FLANN_INDEX_KDTREE = 1
FLANN_INDEX_KMEANS = 2
# FLANN_INDEX_COMPOSITE = 3
# FLANN_INDEX_KDTREE_SINGLE = 4
# FLANN_INDEX_HIERARCHICAL = 5
if descriptors.dtype.type is np.float32:
algorithm_type = config["flann_algorithm"].upper()
if algorithm_type == "KMEANS":
FLANN_INDEX_METHOD = FLANN_INDEX_KMEANS
elif algorithm_type == "KDTREE":
FLANN_INDEX_METHOD = FLANN_INDEX_KDTREE
else:
raise ValueError("Unknown flann algorithm type " "must be KMEANS, KDTREE")
flann_params = {
"algorithm": FLANN_INDEX_METHOD,
"branching": config["flann_branching"],
"iterations": config["flann_iterations"],
"tree": config["flann_tree"],
}
else:
raise ValueError(
"FLANN isn't supported for binary features because of poor-performance. Use BRUTEFORCE instead."
)
return context.flann_Index(descriptors, flann_params)