opensfm/matching.py (763 lines of code) (raw):
import logging
from timeit import default_timer as timer
from typing import Optional, Dict, Any, Tuple, List, Generator
import cv2
import numpy as np
from opensfm import (
context,
feature_loader,
log,
multiview,
pairs_selection,
pyfeatures,
pygeometry,
)
from opensfm.dataset_base import DataSetBase
logger = logging.getLogger(__name__)
def clear_cache() -> None:
feature_loader.instance.clear_cache()
def match_images(
data: DataSetBase,
config_override: Dict[str, Any],
ref_images: List[str],
cand_images: List[str],
) -> Tuple[Dict[Tuple[str, str], List[Tuple[int, int]]], Dict[str, Any]]:
"""Perform pair matchings between two sets of images.
It will do matching for each pair (i, j), i being in
ref_images and j in cand_images, taking assumption that
matching(i, j) == matching(j ,i). This does not hold for
non-symmetric matching options like WORDS. Data will be
stored in i matching only.
"""
# Get EXIFs data
all_images = list(set(ref_images + cand_images))
exifs = {im: data.load_exif(im) for im in all_images}
# Generate pairs for matching
pairs, preport = pairs_selection.match_candidates_from_metadata(
ref_images,
cand_images,
exifs,
data,
config_override,
)
# Match them !
return (
match_images_with_pairs(data, config_override, exifs, pairs),
preport,
)
def match_images_with_pairs(
data: DataSetBase,
config_override: Dict[str, Any],
exifs: Dict[str, Any],
pairs: List[Tuple[str, str]],
poses: Optional[Dict[str, pygeometry.Pose]] = None,
) -> Dict[Tuple[str, str], List[Tuple[int, int]]]:
"""Perform pair matchings given pairs."""
cameras = data.load_camera_models()
args = list(match_arguments(pairs, data, config_override, cameras, exifs, poses))
# Perform all pair matchings in parallel
start = timer()
logger.info("Matching {} image pairs".format(len(pairs)))
processes = config_override.get("processes", data.config["processes"])
mem_per_process = 512
jobs_per_process = 2
processes = context.processes_that_fit_in_memory(processes, mem_per_process)
logger.info("Computing pair matching with %d processes" % processes)
matches = context.parallel_map(match_unwrap_args, args, processes, jobs_per_process)
logger.info(
"Matched {} pairs {} in {} seconds ({} seconds/pair).".format(
len(pairs),
log_projection_types(pairs, exifs, cameras),
timer() - start,
(timer() - start) / len(pairs) if pairs else 0,
)
)
# Index results per pair
resulting_pairs = {}
for im1, im2, m in matches:
resulting_pairs[im1, im2] = m
return resulting_pairs
def log_projection_types(
pairs: List[Tuple[str, str]],
exifs: Dict[str, Any],
cameras: Dict[str, pygeometry.Camera],
) -> str:
if not pairs:
return ""
projection_type_pairs = {}
for im1, im2 in pairs:
pt1 = cameras[exifs[im1]["camera"]].projection_type
pt2 = cameras[exifs[im2]["camera"]].projection_type
if pt1 not in projection_type_pairs:
projection_type_pairs[pt1] = {}
if pt2 not in projection_type_pairs[pt1]:
projection_type_pairs[pt1][pt2] = []
projection_type_pairs[pt1][pt2].append((im1, im2))
output = "("
for pt1 in projection_type_pairs:
for pt2 in projection_type_pairs[pt1]:
output += "{}-{}: {}, ".format(
pt1, pt2, len(projection_type_pairs[pt1][pt2])
)
return output[:-2] + ")"
def save_matches(
data: DataSetBase,
images_ref: List[str],
matched_pairs: Dict[Tuple[str, str], List[Tuple[int, int]]],
) -> None:
"""Given pairwise matches (image 1, image 2) - > matches,
save them such as only {image E images_ref} will store the matches.
"""
images_ref_set = set(images_ref)
matches_per_im1 = {im: {} for im in images_ref}
for (im1, im2), m in matched_pairs.items():
if im1 in images_ref_set:
matches_per_im1[im1][im2] = m
elif im2 in images_ref_set:
matches_per_im1[im2][im1] = m
else:
raise RuntimeError(
"Couldn't save matches for {}. No image found in images_ref.".format(
(im1, im2)
)
)
for im1, im1_matches in matches_per_im1.items():
data.save_matches(im1, im1_matches)
def match_arguments(
pairs: List[Tuple[str, str]],
data: DataSetBase,
config_override: Dict[str, Any],
cameras: Dict[str, pygeometry.Camera],
exifs: Dict[str, pygeometry.Camera],
poses: Optional[Dict[str, pygeometry.Pose]],
) -> Generator[
Tuple[
str,
str,
Dict[str, pygeometry.Camera],
Dict[str, pygeometry.Camera],
DataSetBase,
Dict[str, Any],
Optional[Dict[str, pygeometry.Pose]],
],
None,
None,
]:
"""Generate arguments for parralel processing of pair matching"""
for im1, im2 in pairs:
yield im1, im2, cameras, exifs, data, config_override, poses
def match_unwrap_args(
args: Tuple[
str,
str,
Dict[str, pygeometry.Camera],
Dict[str, Any],
DataSetBase,
Dict[str, Any],
Optional[Dict[str, pygeometry.Pose]],
]
) -> Tuple[str, str, np.ndarray]:
"""Wrapper for parallel processing of pair matching.
Compute all pair matchings of a given image and save them.
"""
log.setup()
im1 = args[0]
im2 = args[1]
cameras = args[2]
exifs = args[3]
data: DataSetBase = args[4]
config_override = args[5]
poses = args[6]
if poses:
pose1 = poses[im1]
pose2 = poses[im2]
pose = pose2.relative_to(pose1)
else:
pose = None
camera1 = cameras[exifs[im1]["camera"]]
camera2 = cameras[exifs[im2]["camera"]]
matches = match(im1, im2, camera1, camera2, data, config_override, pose)
return im1, im2, matches
def match_descriptors(
im1: str,
im2: str,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
data: DataSetBase,
config_override: Dict[str, Any],
) -> np.ndarray:
"""Perform descriptor matching for a pair of images."""
# Override parameters
overriden_config = data.config.copy()
overriden_config.update(config_override)
# Run descriptor matching
time_start = timer()
_, _, matches, matcher_type = _match_descriptors_impl(
im1, im2, camera1, camera2, data, overriden_config
)
time_2d_matching = timer() - time_start
# From indexes in filtered sets, to indexes in original sets of features
matches_unfiltered = []
m1 = feature_loader.instance.load_mask(data, im1)
m2 = feature_loader.instance.load_mask(data, im2)
if m1 is not None and m2 is not None:
matches_unfiltered = unfilter_matches(matches, m1, m2)
symmetric = "symmetric" if overriden_config["symmetric_matching"] else "one-way"
logger.debug(
"Matching {} and {}. Matcher: {} ({}) "
"T-desc: {:1.3f} Matches: {}".format(
im1,
im2,
matcher_type,
symmetric,
time_2d_matching,
len(matches_unfiltered),
)
)
return np.array(matches_unfiltered, dtype=int)
def _match_descriptors_guided_impl(
im1: str,
im2: str,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
relative_pose: pygeometry.Pose,
data: DataSetBase,
overriden_config: Dict[str, Any],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, str]:
"""Perform descriptor guided matching for a pair of images, using their relative pose. It also apply static objects removal."""
guided_matcher_override = "BRUTEFORCE"
matcher_type = overriden_config["matcher_type"].upper()
symmetric_matching = overriden_config["symmetric_matching"]
if matcher_type in ["WORDS", "FLANN"] or symmetric_matching:
logger.warning(
f"{matcher_type} and/or symmetric isn't supported for guided matching, switching to asymmetric {guided_matcher_override}"
)
matcher_type = guided_matcher_override
# Will apply mask to features if any
dummy = np.array([])
segmentation_in_descriptor = overriden_config["matching_use_segmentation"]
features_data1 = feature_loader.instance.load_all_data(
data,
im1,
masked=True,
segmentation_in_descriptor=segmentation_in_descriptor,
)
features_data2 = feature_loader.instance.load_all_data(
data, im2, masked=True, segmentation_in_descriptor=segmentation_in_descriptor
)
bearings1 = feature_loader.instance.load_bearings(
data, im1, masked=True, camera=camera1
)
bearings2 = feature_loader.instance.load_bearings(
data, im2, masked=True, camera=camera2
)
if (
features_data1 is None
or bearings1 is None
or len(features_data1.points) < 2
or features_data2 is None
or bearings2 is None
or len(features_data2.points) < 2
):
return dummy, dummy, dummy, matcher_type
d1 = features_data1.descriptors
d2 = features_data2.descriptors
if d1 is None or d2 is None:
return dummy, dummy, dummy, matcher_type
epipolar_mask = compute_inliers_bearing_epipolar(
bearings1,
bearings2,
relative_pose,
overriden_config["guided_matching_threshold"],
)
matches = match_brute_force_symmetric(d1, d2, overriden_config, epipolar_mask)
# Adhoc filters
if overriden_config["matching_use_filters"]:
matches = apply_adhoc_filters(
data,
matches,
im1,
camera1,
features_data1.points,
im2,
camera2,
features_data2.points,
)
return (
features_data1.points,
features_data2.points,
np.array(matches, dtype=int),
matcher_type,
)
def _match_descriptors_impl(
im1: str,
im2: str,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
data: DataSetBase,
overriden_config: Dict[str, Any],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, str]:
"""Perform descriptor matching for a pair of images. It also apply static objects removal."""
dummy = np.array([])
matcher_type = overriden_config["matcher_type"].upper()
dummy_ret = dummy, dummy, dummy, matcher_type
# Will apply mask to features if any
dummy = np.array([])
segmentation_in_descriptor = overriden_config["matching_use_segmentation"]
features_data1 = feature_loader.instance.load_all_data(
data, im1, masked=True, segmentation_in_descriptor=segmentation_in_descriptor
)
features_data2 = feature_loader.instance.load_all_data(
data, im2, masked=True, segmentation_in_descriptor=segmentation_in_descriptor
)
if (
features_data1 is None
or len(features_data1.points) < 2
or features_data2 is None
or len(features_data2.points) < 2
):
return dummy_ret
d1 = features_data1.descriptors
d2 = features_data2.descriptors
if d1 is None or d2 is None:
return dummy_ret
# Prevent using FLANN + Binary (np.uint8)
binary_matcher_override = "BRUTEFORCE"
if matcher_type == "FLANN" and d1.dtype == np.uint8:
logger.warning(
f"{matcher_type} for binary descriptors offers poor performance. Switching to {binary_matcher_override}."
)
matcher_type = binary_matcher_override
symmetric_matching = overriden_config["symmetric_matching"]
if matcher_type == "WORDS":
words1 = feature_loader.instance.load_words(data, im1, masked=True)
words2 = feature_loader.instance.load_words(data, im2, masked=True)
if words1 is None or words2 is None:
return dummy_ret
if symmetric_matching:
matches = match_words_symmetric(
d1,
words1,
d2,
words2,
overriden_config,
)
else:
matches = match_words(
d1,
words1,
d2,
words2,
overriden_config,
)
elif matcher_type == "FLANN":
f1 = feature_loader.instance.load_features_index(
data,
im1,
masked=True,
segmentation_in_descriptor=segmentation_in_descriptor,
)
if not f1:
return dummy_ret
feat_data_index1, index1 = f1
if symmetric_matching:
f2 = feature_loader.instance.load_features_index(
data,
im2,
masked=True,
segmentation_in_descriptor=segmentation_in_descriptor,
)
if not f2:
return dummy_ret
feat_data_index2, index2 = f2
descriptors1 = feat_data_index1.descriptors
descriptors2 = feat_data_index2.descriptors
if descriptors1 is None or descriptors2 is None:
return dummy_ret
matches = match_flann_symmetric(
descriptors1,
index1,
descriptors2,
index2,
overriden_config,
)
else:
matches = match_flann(index1, d2, overriden_config)
elif matcher_type == "BRUTEFORCE":
if symmetric_matching:
matches = match_brute_force_symmetric(d1, d2, overriden_config)
else:
matches = match_brute_force(d1, d2, overriden_config)
else:
raise ValueError("Invalid matcher_type: {}".format(matcher_type))
# Adhoc filters
if overriden_config["matching_use_filters"]:
matches = apply_adhoc_filters(
data,
matches,
im1,
camera1,
features_data1.points,
im2,
camera2,
features_data2.points,
)
return (
features_data1.points,
features_data2.points,
np.array(matches, dtype=int),
matcher_type,
)
def match_robust(
im1: str,
im2: str,
matches,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
data: DataSetBase,
config_override: Dict[str, Any],
) -> np.ndarray:
"""Perform robust geometry matching on a set of matched descriptors indexes."""
# Override parameters
overriden_config = data.config.copy()
overriden_config.update(config_override)
# Will apply mask to features if any
segmentation_in_descriptor = overriden_config[
"matching_use_segmentation"
] # unused but keep using the same cache
features_data1 = feature_loader.instance.load_all_data(
data, im1, masked=True, segmentation_in_descriptor=segmentation_in_descriptor
)
features_data2 = feature_loader.instance.load_all_data(
data, im2, masked=True, segmentation_in_descriptor=segmentation_in_descriptor
)
if (
features_data1 is None
or len(features_data1.points) < 2
or features_data2 is None
or len(features_data2.points) < 2
):
return np.array([])
# Run robust matching
np_matches = np.array(matches, dtype=int)
t = timer()
rmatches = _match_robust_impl(
im1,
im2,
features_data1.points,
features_data2.points,
np_matches,
camera1,
camera2,
data,
overriden_config,
)
time_robust_matching = timer() - t
# From indexes in filtered sets, to indexes in original sets of features
rmatches_unfiltered = []
m1 = feature_loader.instance.load_mask(data, im1)
m2 = feature_loader.instance.load_mask(data, im2)
if m1 is not None and m2 is not None:
rmatches_unfiltered = unfilter_matches(rmatches, m1, m2)
robust_matching_min_match = overriden_config["robust_matching_min_match"]
logger.debug(
"Matching {} and {}. T-robust: {:1.3f} "
"Matches: {} Robust: {} Success: {}".format(
im1,
im2,
time_robust_matching,
len(matches),
len(rmatches_unfiltered),
len(rmatches_unfiltered) >= robust_matching_min_match,
)
)
if len(rmatches_unfiltered) < robust_matching_min_match:
return np.array([])
return np.array(rmatches_unfiltered, dtype=int)
def _match_robust_impl(
im1: str,
im2: str,
p1: np.ndarray,
p2: np.ndarray,
matches: np.ndarray,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
data: DataSetBase,
overriden_config: Dict[str, Any],
) -> np.ndarray:
"""Perform robust geometry matching on a set of matched descriptors indexes."""
# robust matching
rmatches = robust_match(p1, p2, camera1, camera2, matches, overriden_config)
rmatches = np.array([[a, b] for a, b in rmatches])
return rmatches
def match(
im1: str,
im2: str,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
data: DataSetBase,
config_override: Dict[str, Any],
guided_matching_pose: Optional[pygeometry.Pose],
) -> np.ndarray:
"""Perform full matching (descriptor+robust, optionally guided) for a pair of images."""
# Override parameters
overriden_config = data.config.copy()
overriden_config.update(config_override)
# Run descriptor matching
time_start = timer()
if guided_matching_pose:
p1, p2, matches, matcher_type = _match_descriptors_guided_impl(
im1, im2, camera1, camera2, guided_matching_pose, data, overriden_config
)
else:
p1, p2, matches, matcher_type = _match_descriptors_impl(
im1, im2, camera1, camera2, data, overriden_config
)
time_2d_matching = timer() - time_start
symmetric = "symmetric" if overriden_config["symmetric_matching"] else "one-way"
robust_matching_min_match = overriden_config["robust_matching_min_match"]
if len(matches) < robust_matching_min_match:
logger.debug(
"Matching {} and {}. Matcher: {} ({}) T-desc: {:1.3f} "
"Matches: FAILED".format(
im1, im2, matcher_type, symmetric, time_2d_matching
)
)
return np.array([])
# Run robust matching (non guided case only)
t = timer()
rmatches = _match_robust_impl(
im1, im2, p1, p2, matches, camera1, camera2, data, overriden_config
)
time_robust_matching = timer() - t
# From indexes in filtered sets, to indexes in original sets of features
m1 = feature_loader.instance.load_mask(data, im1)
m2 = feature_loader.instance.load_mask(data, im2)
if m1 is not None and m2 is not None:
rmatches = unfilter_matches(rmatches, m1, m2)
time_total = timer() - time_start
logger.debug(
"Matching {} and {}. Matcher: {} ({}) "
"T-desc: {:1.3f} T-robust: {:1.3f} T-total: {:1.3f} "
"Matches: {} Robust: {} Success: {}".format(
im1,
im2,
matcher_type,
symmetric,
time_2d_matching,
time_robust_matching,
time_total,
len(matches),
len(rmatches),
len(rmatches) >= robust_matching_min_match,
)
)
if len(rmatches) < robust_matching_min_match:
return np.array([])
return np.array(rmatches, dtype=int)
def match_words(
f1: np.ndarray,
words1: np.ndarray,
f2: np.ndarray,
words2: np.ndarray,
config: Dict[str, Any],
) -> np.ndarray:
"""Match using words and apply Lowe's ratio filter.
Args:
f1: feature descriptors of the first image
w1: the nth closest words for each feature in the first image
f2: feature descriptors of the second image
w2: the nth closest words for each feature in the second image
config: config parameters
"""
ratio = config["lowes_ratio"]
num_checks = config["bow_num_checks"]
return pyfeatures.match_using_words(f1, words1, f2, words2[:, 0], ratio, num_checks)
def match_words_symmetric(
f1: np.ndarray,
words1: np.ndarray,
f2: np.ndarray,
words2: np.ndarray,
config: Dict[str, Any],
) -> List[Tuple[int, int]]:
"""Match using words in both directions and keep consistent matches.
Args:
f1: feature descriptors of the first image
w1: the nth closest words for each feature in the first image
f2: feature descriptors of the second image
w2: the nth closest words for each feature in the second image
config: config parameters
"""
matches_ij = match_words(f1, words1, f2, words2, config)
matches_ji = match_words(f2, words2, f1, words1, config)
matches_ij = [(a, b) for a, b in matches_ij]
matches_ji = [(b, a) for a, b in matches_ji]
return list(set(matches_ij).intersection(set(matches_ji)))
def match_flann(
index: Any, f2: np.ndarray, config: Dict[str, Any]
) -> List[Tuple[int, int]]:
"""Match using FLANN and apply Lowe's ratio filter.
Args:
index: flann index if the first image
f2: feature descriptors of the second image
config: config parameters
"""
search_params = dict(checks=config["flann_checks"])
results, dists = index.knnSearch(f2, 2, params=search_params)
squared_ratio = config["lowes_ratio"] ** 2 # Flann returns squared L2 distances
good = dists[:, 0] < squared_ratio * dists[:, 1]
return list(zip(results[good, 0], good.nonzero()[0]))
def match_flann_symmetric(
fi: np.ndarray, indexi: Any, fj: np.ndarray, indexj: Any, config: Dict[str, Any]
) -> List[Tuple[int, int]]:
"""Match using FLANN in both directions and keep consistent matches.
Args:
fi: feature descriptors of the first image
indexi: flann index if the first image
fj: feature descriptors of the second image
indexj: flann index of the second image
config: config parameters
maskij: optional boolean mask of len(i descriptors) x len(j descriptors)
"""
matches_ij = [(a, b) for a, b in match_flann(indexi, fj, config)]
matches_ji = [(b, a) for a, b in match_flann(indexj, fi, config)]
return list(set(matches_ij).intersection(set(matches_ji)))
def match_brute_force(
f1: np.ndarray,
f2: np.ndarray,
config: Dict[str, Any],
maskij: Optional[np.ndarray] = None,
) -> List[Tuple[int, int]]:
"""Brute force matching and Lowe's ratio filtering.
Args:
f1: feature descriptors of the first image
f2: feature descriptors of the second image
config: config parameters
maskij: optional boolean mask of len(i descriptors) x len(j descriptors)
"""
assert f1.dtype.type == f2.dtype.type
if f1.dtype.type == np.uint8:
matcher_type = "BruteForce-Hamming"
else:
matcher_type = "BruteForce"
matcher = cv2.DescriptorMatcher_create(matcher_type)
matcher.add([f2])
if maskij is not None:
matches = matcher.knnMatch(f1, k=2, masks=np.array([maskij]).astype(np.uint8))
else:
matches = matcher.knnMatch(f1, k=2)
ratio = config["lowes_ratio"]
good_matches = []
for match in matches:
if match and len(match) == 2:
m, n = match
if m.distance < ratio * n.distance:
good_matches.append(m)
return _convert_matches_to_vector(good_matches)
def _convert_matches_to_vector(matches: List[Any]) -> List[Tuple[int, int]]:
"""Convert Dmatch object to matrix form."""
return [(mm.queryIdx, mm.trainIdx) for mm in matches]
def match_brute_force_symmetric(
fi: np.ndarray,
fj: np.ndarray,
config: Dict[str, Any],
maskij: Optional[np.ndarray] = None,
) -> List[Tuple[int, int]]:
"""Match with brute force in both directions and keep consistent matches.
Args:
fi: feature descriptors of the first image
fj: feature descriptors of the second image
config: config parameters
maskij: optional boolean mask of len(i descriptors) x len(j descriptors)
"""
matches_ij = [(a, b) for a, b in match_brute_force(fi, fj, config, maskij)]
maskijT = maskij.T if maskij is not None else None
matches_ji = [(b, a) for a, b in match_brute_force(fj, fi, config, maskijT)]
return list(set(matches_ij).intersection(set(matches_ji)))
def robust_match_fundamental(
p1: np.ndarray,
p2: np.ndarray,
matches: np.ndarray,
config: Dict[str, Any],
) -> Tuple[np.ndarray, np.ndarray]:
"""Filter matches by estimating the Fundamental matrix via RANSAC."""
if len(matches) < 8:
return np.array([]), np.array([])
p1 = p1[matches[:, 0]][:, :2].copy()
p2 = p2[matches[:, 1]][:, :2].copy()
FM_RANSAC = cv2.FM_RANSAC if context.OPENCV3 else cv2.cv.CV_FM_RANSAC
threshold = config["robust_matching_threshold"]
F, mask = cv2.findFundamentalMat(p1, p2, FM_RANSAC, threshold, 0.9999)
inliers = mask.ravel().nonzero()
if F is None or F[2, 2] == 0.0:
return F, np.array([])
return F, matches[inliers]
def compute_inliers_bearings(
b1: np.ndarray, b2: np.ndarray, R: np.ndarray, t: np.ndarray, threshold=0.01
) -> List[bool]:
"""Compute points that can be triangulated.
Args:
b1, b2: Bearings in the two images.
R, t: Rotation and translation from the second image to the first.
That is the convention and the opposite of many
functions in this module.
threshold: max reprojection error in radians.
Returns:
array: Array of boolean indicating inliers/outliers
"""
p = pygeometry.triangulate_two_bearings_midpoint_many(b1, b2, R, t)
good_idx = [i for i in range(len(p)) if p[i][0]]
points = np.array([p[i][1] for i in range(len(p)) if p[i][0]])
inliers = [False] * len(b1)
if len(points) < 1:
return inliers
br1 = points.copy()
br1 /= np.linalg.norm(br1, axis=1)[:, np.newaxis]
br2 = R.T.dot((points - t).T).T
br2 /= np.linalg.norm(br2, axis=1)[:, np.newaxis]
ok1 = np.linalg.norm(br1 - b1[good_idx], axis=1) < threshold
ok2 = np.linalg.norm(br2 - b2[good_idx], axis=1) < threshold
is_ok = ok1 * ok2
for i, ok in enumerate(is_ok):
inliers[good_idx[i]] = ok
return inliers
def compute_inliers_bearing_epipolar(
b1: np.ndarray, b2: np.ndarray, pose: pygeometry.Pose, threshold: float
) -> np.ndarray:
"""Compute mask of epipolarly consistent bearings, given two lists of bearings
Args:
b1, b2: Bearings in the two images. Expected to be normalized.
pose: Pose of the second image wrt. the first one (relative pose)
threshold: max reprojection error in radians.
Returns:
array: Matrix of boolean indicating inliers/outliers
"""
symmetric_angle_error = pygeometry.epipolar_angle_two_bearings_many(
b1.astype(np.float32),
b2.astype(np.float32),
pose.get_R_cam_to_world(),
pose.get_origin(),
)
mask = symmetric_angle_error < threshold
return mask
def robust_match_calibrated(
p1: np.ndarray,
p2: np.ndarray,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
matches: np.ndarray,
config: Dict[str, Any],
) -> np.ndarray:
"""Filter matches by estimating the Essential matrix via RANSAC."""
if len(matches) < 8:
return np.array([])
p1 = p1[matches[:, 0]][:, :2].copy()
p2 = p2[matches[:, 1]][:, :2].copy()
b1 = camera1.pixel_bearing_many(p1)
b2 = camera2.pixel_bearing_many(p2)
threshold = config["robust_matching_calib_threshold"]
T = multiview.relative_pose_ransac(b1, b2, threshold, 1000, 0.999)
for relax in [4, 2, 1]:
inliers = compute_inliers_bearings(b1, b2, T[:, :3], T[:, 3], relax * threshold)
if np.sum(inliers) < 8:
return np.array([])
iterations = config["five_point_refine_match_iterations"]
T = multiview.relative_pose_optimize_nonlinear(
b1[inliers], b2[inliers], T[:3, 3], T[:3, :3], iterations
)
inliers = compute_inliers_bearings(b1, b2, T[:, :3], T[:, 3], threshold)
return matches[inliers]
def robust_match(
p1: np.ndarray,
p2: np.ndarray,
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
matches: np.ndarray,
config: Dict[str, Any],
) -> np.ndarray:
"""Filter matches by fitting a geometric model.
If cameras are perspective without distortion, then the Fundamental
matrix is used. Otherwise, we use the Essential matrix.
"""
if (
camera1.projection_type in ["perspective", "brown"]
and camera1.k1 == 0.0
and camera1.k2 == 0.0
and camera2.projection_type in ["perspective", "brown"]
and camera2.k1 == 0.0
and camera2.k2 == 0.0
):
return robust_match_fundamental(p1, p2, matches, config)[1]
else:
return robust_match_calibrated(p1, p2, camera1, camera2, matches, config)
def unfilter_matches(matches, m1, m2) -> np.ndarray:
"""Given matches and masking arrays, get matches with un-masked indexes."""
i1 = np.flatnonzero(m1)
i2 = np.flatnonzero(m2)
return np.array([(i1[match[0]], i2[match[1]]) for match in matches])
def apply_adhoc_filters(
data: DataSetBase,
matches,
im1: str,
camera1: pygeometry.Camera,
p1: np.ndarray,
im2: str,
camera2: pygeometry.Camera,
p2: np.ndarray,
) -> List[Tuple[int, int]]:
"""Apply a set of filters functions defined further below
for removing static data in images.
"""
matches = _non_static_matches(p1, p2, matches)
matches = _not_on_pano_poles_matches(p1, p2, matches, camera1, camera2)
matches = _not_on_vermont_watermark(p1, p2, matches, im1, im2, data)
matches = _not_on_blackvue_watermark(p1, p2, matches, im1, im2, data)
return matches
def _non_static_matches(
p1: np.ndarray, p2: np.ndarray, matches: List[Tuple[int, int]]
) -> List[Tuple[int, int]]:
"""Remove matches with same position in both images.
That should remove matches on that are likely belong to rig occluders,
watermarks or dust, but not discard entirely static images.
"""
threshold = 0.001
res = []
for match in matches:
d = p1[match[0]] - p2[match[1]]
if d[0] ** 2 + d[1] ** 2 >= threshold ** 2:
res.append(match)
static_ratio_threshold = 0.85
static_ratio_removed = 1 - len(res) / max(len(matches), 1)
if static_ratio_removed > static_ratio_threshold:
return matches
else:
return res
def _not_on_pano_poles_matches(
p1: np.ndarray,
p2: np.ndarray,
matches: List[Tuple[int, int]],
camera1: pygeometry.Camera,
camera2: pygeometry.Camera,
) -> List[Tuple[int, int]]:
"""Remove matches for features that are too high or to low on a pano.
That should remove matches on the sky and and carhood part of panoramas
"""
min_lat = -0.125
max_lat = 0.125
is_pano1 = pygeometry.Camera.is_panorama(camera1.projection_type)
is_pano2 = pygeometry.Camera.is_panorama(camera2.projection_type)
if is_pano1 or is_pano2:
res = []
for match in matches:
if (not is_pano1 or min_lat < p1[match[0]][1] < max_lat) and (
not is_pano2 or min_lat < p2[match[1]][1] < max_lat
):
res.append(match)
return res
else:
return matches
def _not_on_vermont_watermark(
p1: np.ndarray,
p2: np.ndarray,
matches: List[Tuple[int, int]],
im1: str,
im2: str,
data: DataSetBase,
) -> List[Tuple[int, int]]:
"""Filter Vermont images watermark."""
meta1 = data.load_exif(im1)
meta2 = data.load_exif(im2)
if meta1["make"] == "VTrans_Camera" and meta1["model"] == "VTrans_Camera":
matches = [m for m in matches if _vermont_valid_mask(p1[m[0]])]
if meta2["make"] == "VTrans_Camera" and meta2["model"] == "VTrans_Camera":
matches = [m for m in matches if _vermont_valid_mask(p2[m[1]])]
return matches
def _vermont_valid_mask(p: np.ndarray) -> bool:
"""Check if pixel inside the valid region.
Pixel coord Y should be larger than 50.
In normalized coordinates y > (50 - h / 2) / w
"""
return p[1] > -0.255
def _not_on_blackvue_watermark(
p1: np.ndarray, p2: np.ndarray, matches, im1: str, im2: str, data: DataSetBase
) -> List[Tuple[int, int]]:
"""Filter Blackvue's watermark."""
meta1 = data.load_exif(im1)
meta2 = data.load_exif(im2)
if meta1["make"].lower() == "blackvue":
matches = [m for m in matches if _blackvue_valid_mask(p1[m[0]])]
if meta2["make"].lower() == "blackvue":
matches = [m for m in matches if _blackvue_valid_mask(p2[m[1]])]
return matches
def _blackvue_valid_mask(p: np.ndarray) -> bool:
"""Check if pixel inside the valid region.
Pixel coord Y should be smaller than h - 70.
In normalized coordinates y < (h - 70 - h / 2) / w,
with h = 2160 and w = 3840
"""
return p[1] < 0.263