opensfm/pairs_selection.py (584 lines of code) (raw):
import copy
import logging
import math
from collections import defaultdict
from itertools import combinations
from typing import Optional, Tuple, List, Set, Dict, Iterable, Any
import numpy as np
import scipy.spatial as spatial
from opensfm import bow, context, feature_loader, vlad, geo, geometry
from opensfm.dataset_base import DataSetBase
logger = logging.getLogger(__name__)
def has_gps_info(exif: Dict[str, Any]) -> bool:
return (
exif
and "gps" in exif
and "latitude" in exif["gps"]
and "longitude" in exif["gps"]
)
def sorted_pair(im1: str, im2: str) -> Tuple[str, str]:
if im1 < im2:
return im1, im2
else:
return im2, im1
def get_gps_point(
exif: Dict[str, Any], reference: geo.TopocentricConverter
) -> Tuple[np.ndarray, np.ndarray]:
"""Return GPS-based representative point. Direction is returned as Z oriented (vertical assumption)"""
gps = exif["gps"]
altitude = 0
direction = np.array([0, 0, 1])
return (
reference.to_topocentric(gps["latitude"], gps["longitude"], altitude),
direction,
)
DEFAULT_Z = 1.0
MAXIMUM_Z = 8000
SAMPLE_Z = 100
def sign(x: float) -> float:
"""Return a float's sign."""
return 1.0 if x > 0.0 else -1.0
def get_gps_opk_point(
exif: Dict[str, Any], reference: geo.TopocentricConverter
) -> Tuple[np.ndarray, np.ndarray]:
"""Return GPS-based representative point."""
opk = exif["opk"]
omega, phi, kappa = (
math.radians(opk["omega"]),
math.radians(opk["phi"]),
math.radians(opk["kappa"]),
)
R_camera = geometry.rotation_from_opk(omega, phi, kappa)
z_axis = R_camera[2]
origin = get_gps_point(exif, reference)
return origin[0], z_axis / (sign(z_axis[2]) * z_axis[2]) * DEFAULT_Z
def find_best_altitude(
origin: Dict[str, np.ndarray], directions: Dict[str, np.ndarray]
) -> float:
"""Find the altitude that minimize X/Y bounding box. Domain is [0, MAXIMUM_Z].
'origin' contains per-image positions in worl coordinates
'directions' viewing directions expected to be homogenized with z=1.
We sample it every SAMPLE_Z and regress a second polynomial from which we take its extrema.
"""
directions_base = np.array([p for p in directions.values()])
origin_base = np.array([p for p in origin.values()])
samples_x, samples_y = [], []
for current_z in range(1, MAXIMUM_Z, SAMPLE_Z):
scaled = origin_base + directions_base / DEFAULT_Z * current_z
current_size = (np.max(scaled[:, 0]) - np.min(scaled[:, 0])) ** 2 + (
np.max(scaled[:, 1]) - np.min(scaled[:, 1])
) ** 2
samples_x.append(current_z)
samples_y.append(current_size)
coeffs = np.polyfit(samples_x, samples_y, 2)
extrema = -coeffs[1] / (2 * coeffs[0])
if extrema < 0:
logger.info(
f"Altitude is negative ({extrema}) : viewing directions are probably divergent. Using default altitude of {DEFAULT_Z}"
)
extrema = DEFAULT_Z
return extrema
def get_representative_points(
images: List[str], exifs: Dict[str, Any], reference: geo.TopocentricConverter
) -> Dict[str, np.ndarray]:
"""Return a topocentric point for each image, that is suited to run distance-based pair selection."""
origin = {}
directions = {}
# GPS/ OPK / YPR method table
map_method = {
(True, False, False): get_gps_point,
(True, True, False): get_gps_opk_point,
# (True, False, True): add YPR method here
}
had_orientation = False
for image in images:
exif = exifs[image]
has_gps = (
"gps" in exif and "latitude" in exif["gps"] and "longitude" in exif["gps"]
)
if not has_gps:
continue
has_opk = "opk" in exif
has_ypr = "ypr" in exif
had_orientation |= has_opk or has_ypr
method_id = (has_gps, has_opk, has_ypr)
if method_id not in map_method:
raise RuntimeError(
f"GPS / OPK / YPR {has_gps, has_opk, has_ypr} tag combination unsupported"
)
origin[image], directions[image] = map_method[method_id](exif, reference)
if had_orientation:
altitude = find_best_altitude(origin, directions)
logger.info(f"Altitude for orientation based matching {altitude}")
directions_scaled = {k: v / DEFAULT_Z * altitude for k, v in directions.items()}
points = {k: origin[k] + directions_scaled[k] for k in images}
else:
points = origin
return points
def match_candidates_by_distance(
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_neighbors: int,
max_distance: float,
) -> Set[Tuple[str, str]]:
"""Find candidate matching pairs by GPS distance.
The GPS altitude is ignored because we want images of the same position
at different altitudes to be matched together. Otherwise, for drone
datasets, flights at different altitudes do not get matched.
"""
if len(images_cand) == 0:
return set()
if max_neighbors <= 0 and max_distance <= 0:
return set()
max_neighbors = max_neighbors or 99999999
max_distance = max_distance or 99999999.0
k = min(len(images_cand), max_neighbors)
representative_points = get_representative_points(
images_cand + images_ref, exifs, reference
)
# we don't want to loose some images because of missing GPS :
# either ALL of them or NONE of them are used for getting pairs
difference = abs(len(representative_points) - len(set(images_cand + images_ref)))
if difference > 0:
logger.warning(f"Couldn't fetch {difference} images. Returning NO pairs.")
return set()
points = np.zeros((len(representative_points), 3))
for i, point_id in enumerate(images_cand):
points[i] = representative_points[point_id]
tree = spatial.cKDTree(points)
pairs = set()
for image_ref in images_ref:
nn = k + 1 if image_ref in images_cand else k
point = representative_points[image_ref]
distances, neighbors = tree.query(
point, k=nn, distance_upper_bound=max_distance
)
if type(neighbors) == int: # special case with only one NN
neighbors = [neighbors]
for j in neighbors:
if j >= len(images_cand):
continue
image_cand = images_cand[j]
if image_cand != image_ref:
pairs.add(sorted_pair(image_ref, image_cand))
return pairs
def norm_2d(vec: np.ndarray):
"""Return the 2D norm of a vector."""
return math.sqrt(vec[0] ** 2 + vec[1] ** 2)
def match_candidates_by_graph(
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
rounds: int,
):
"""Find by triangulating the GPS points on X/Y axises"""
if len(images_cand) == 0 or rounds < 1:
return set()
images_cand_set = set(images_cand)
images_ref_set = set(images_ref)
images = list(images_cand_set | images_ref_set)
representative_points = get_representative_points(images, exifs, reference)
points = np.zeros((len(images), 2))
for i, point in enumerate(representative_points.values()):
points[i] = point[0:2]
def produce_edges(triangles):
for triangle in triangles:
for vertex1, vertex2 in combinations(triangle, 2):
image1, image2 = images[vertex1], images[vertex2]
if image1 == image2:
continue
pair_way1 = image1 in images_cand_set and image2 in images_ref_set
pair_way2 = image2 in images_cand_set and image1 in images_ref_set
if pair_way1 or pair_way2:
yield sorted_pair(image1, image2), (vertex1, vertex2)
pairs = set()
# first round compute scale based on edges (and push delaunay edges)
edge_distances = []
triangles = spatial.Delaunay(points).simplices
for (image1, image2), (vertex1, vertex2) in produce_edges(triangles):
pairs.add((image1, image2))
edge_distances.append(norm_2d(points[vertex1] - points[vertex2]))
scale = np.median(edge_distances)
# further rounds produces edges from jittered version of the original points
# in order to get 'alternative' delaunay triangulations : a perfect square
# will only produce one diagonal edge, so by jittering it, we get more
# chances of getting such diagonal edges and having more diversity
for _ in range(rounds):
points_current = copy.copy(points) + np.random.rand(*points.shape) * scale
triangles = spatial.Delaunay(points_current).simplices
for (image1, image2), _ in produce_edges(triangles):
pairs.add((image1, image2))
return pairs
def match_candidates_with_bow(
data: DataSetBase,
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_neighbors: int,
max_gps_distance: float,
max_gps_neighbors: int,
enforce_other_cameras: bool,
) -> Dict[Tuple[str, str], float]:
"""Find candidate matching pairs using BoW-based distance.
If max_gps_distance > 0, then we use first restrain a set of
candidates using max_gps_neighbors neighbors selected using
GPS distance.
If enforce_other_cameras is True, we keep max_neighbors images
with same cameras AND max_neighbors images from any other different
camera.
"""
if max_neighbors <= 0:
return {}
results = compute_bow_affinity(
data,
images_ref,
images_cand,
exifs,
reference,
max_gps_distance,
max_gps_neighbors,
)
return construct_pairs(results, max_neighbors, exifs, enforce_other_cameras)
def compute_bow_affinity(
data: DataSetBase,
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_gps_distance: float,
max_gps_neighbors: int,
) -> List[Tuple[str, List[float], List[str]]]:
"""Compute afinity scores between references and candidates
images using BoW-based distance.
"""
preempted_candidates, need_load = preempt_candidates(
images_ref, images_cand, exifs, reference, max_gps_neighbors, max_gps_distance
)
# construct BoW histograms
logger.info("Computing %d BoW histograms" % len(need_load))
histograms = load_histograms(data, need_load)
# parallel VLAD neighbors computation
args, processes, batch_size = create_parallel_matching_args(
data, preempted_candidates, histograms
)
logger.info("Computing BoW candidates with %d processes" % processes)
return context.parallel_map(match_bow_unwrap_args, args, processes, batch_size)
def match_candidates_with_vlad(
data: DataSetBase,
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_neighbors: int,
max_gps_distance: float,
max_gps_neighbors: int,
enforce_other_cameras: bool,
histograms: Dict[str, np.ndarray],
) -> Dict[Tuple[str, str], float]:
"""Find candidate matching pairs using VLAD-based distance.
If max_gps_distance > 0, then we use first restrain a set of
candidates using max_gps_neighbors neighbors selected using
GPS distance.
If enforce_other_cameras is True, we keep max_neighbors images
with same cameras AND max_neighbors images from any other different
camera.
If enforce_other_cameras is False, we keep max_neighbors images
from all cameras.
Pre-computed VLAD histograms can be passed as a dictionary.
Missing histograms will be computed and added to it.
"""
if max_neighbors <= 0:
return {}
results = compute_vlad_affinity(
data,
images_ref,
images_cand,
exifs,
reference,
max_gps_distance,
max_gps_neighbors,
histograms,
)
return construct_pairs(results, max_neighbors, exifs, enforce_other_cameras)
def compute_vlad_affinity(
data: DataSetBase,
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_gps_distance: float,
max_gps_neighbors: int,
histograms: Dict[str, np.ndarray],
) -> List[Tuple[str, List[float], List[str]]]:
"""Compute afinity scores between references and candidates
images using VLAD-based distance.
"""
preempted_candidates, need_load = preempt_candidates(
images_ref, images_cand, exifs, reference, max_gps_neighbors, max_gps_distance
)
if len(preempted_candidates) == 0:
logger.warning(
f"Couln't preempt any candidate with GPS, using ALL {len(images_cand)} as candidates"
)
preempted_candidates = {image: images_cand for image in images_ref}
need_load = set(images_ref + images_cand)
# construct VLAD histograms
need_load = {im for im in need_load if im not in histograms}
logger.info("Computing %d VLAD histograms" % len(need_load))
histograms.update(vlad_histograms(need_load, data))
# parallel VLAD neighbors computation
args, processes, batch_size = create_parallel_matching_args(
data, preempted_candidates, histograms
)
logger.info("Computing VLAD candidates with %d processes" % processes)
return context.parallel_map(match_vlad_unwrap_args, args, processes, batch_size)
def preempt_candidates(
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
reference: geo.TopocentricConverter,
max_gps_neighbors: int,
max_gps_distance: float,
) -> Tuple[Dict[str, list], Set[str]]:
"""Preempt candidates using GPS to reduce set of images
from which to load data to save RAM.
"""
# preempt candidates images using GPS
preempted_cand = {im: images_cand for im in images_ref}
if max_gps_distance > 0 or max_gps_neighbors > 0:
gps_pairs = match_candidates_by_distance(
images_ref,
images_cand,
exifs,
reference,
max_gps_neighbors,
max_gps_distance,
)
preempted_cand = defaultdict(list)
for p in gps_pairs:
if p[0] in images_ref:
preempted_cand[p[0]].append(p[1])
if p[1] in images_ref:
preempted_cand[p[1]].append(p[0])
# reduce sets of images from which to load histograms (RAM saver)
need_load = set(preempted_cand.keys())
for k, v in preempted_cand.items():
need_load.update(v)
need_load.add(k)
return preempted_cand, need_load
def construct_pairs(
results: List[Tuple[str, List[float], List[str]]],
max_neighbors: int,
exifs: Dict[str, Any],
enforce_other_cameras: bool,
) -> Dict[Tuple[str, str], float]:
"""Construct final sets of pairs to match"""
pairs = {}
for im, distances, other in results:
order = np.argsort(distances)
if enforce_other_cameras:
pairs.update(
pairs_from_neighbors(im, exifs, distances, order, other, max_neighbors)
)
else:
for i in order[:max_neighbors]:
pairs[sorted_pair(im, other[i])] = distances[i]
return pairs
def create_parallel_matching_args(
data: DataSetBase,
preempted_cand: Dict[str, list],
histograms: Dict[str, np.ndarray],
) -> Tuple[List[Tuple[str, list, Dict[str, np.ndarray]]], int, int]:
"""Create arguments to matching function"""
args = [(im, cands, histograms) for im, cands in preempted_cand.items()]
# parallel VLAD neighbors computation
per_process = 512
processes = context.processes_that_fit_in_memory(
data.config["processes"], per_process
)
batch_size = max(1, len(args) // (2 * processes))
return args, processes, batch_size
def match_bow_unwrap_args(
args: Tuple[str, Iterable[str], Dict[str, np.ndarray]]
) -> Tuple[str, List[float], List[str]]:
"""Wrapper for parralel processing of BoW"""
image, other_images, histograms = args
return bow_distances(image, other_images, histograms)
def match_vlad_unwrap_args(
args: Tuple[str, Iterable[str], Dict[str, np.ndarray]]
) -> Tuple[str, List[float], List[str]]:
"""Wrapper for parralel processing of VLAD"""
image, other_images, histograms = args
return vlad.vlad_distances(image, other_images, histograms)
def match_candidates_by_time(
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
max_neighbors: int,
) -> Set[Tuple[str, str]]:
"""Find candidate matching pairs by time difference."""
if max_neighbors <= 0:
return set()
k = min(len(images_cand), max_neighbors)
times = np.zeros((len(images_cand), 1))
for i, image in enumerate(images_cand):
times[i] = exifs[image]["capture_time"]
tree = spatial.cKDTree(times)
pairs = set()
for image_ref in images_ref:
nn = k + 1 if image_ref in images_cand else k
time = exifs[image_ref]["capture_time"]
distances, neighbors = tree.query([time], k=nn)
if type(neighbors) == int: # special case with only one NN
neighbors = [neighbors]
for j in neighbors:
if j >= len(images_cand):
continue
image_cand = images_cand[j]
if image_ref != image_cand:
pairs.add(sorted_pair(image_ref, image_cand))
return pairs
def match_candidates_by_order(
images_ref: List[str], images_cand: List[str], max_neighbors: int
) -> Set[Tuple[str, str]]:
"""Find candidate matching pairs by sequence order."""
if max_neighbors <= 0:
return set()
n = (max_neighbors + 1) // 2
pairs = set()
for i, image_ref in enumerate(images_ref):
a = max(0, i - n)
b = min(len(images_cand), i + n)
for j in range(a, b):
image_cand = images_cand[j]
if image_ref != image_cand:
pairs.add(sorted_pair(image_ref, image_cand))
return pairs
def match_candidates_from_metadata(
images_ref: List[str],
images_cand: List[str],
exifs: Dict[str, Any],
data: DataSetBase,
config_override: Dict[str, Any],
) -> Tuple[List[Tuple[str, str]], Dict[str, Any]]:
"""Compute candidate matching pairs between between images_ref and images_cand
Returns a list of pairs (im1, im2) such that (im1 in images_ref) is true.
Returned pairs are unique given that (i, j) == (j, i).
"""
overriden_config = data.config.copy()
overriden_config.update(config_override)
max_distance = overriden_config["matching_gps_distance"]
gps_neighbors = overriden_config["matching_gps_neighbors"]
graph_rounds = overriden_config["matching_graph_rounds"]
time_neighbors = overriden_config["matching_time_neighbors"]
order_neighbors = overriden_config["matching_order_neighbors"]
bow_neighbors = overriden_config["matching_bow_neighbors"]
bow_gps_distance = overriden_config["matching_bow_gps_distance"]
bow_gps_neighbors = overriden_config["matching_bow_gps_neighbors"]
bow_other_cameras = overriden_config["matching_bow_other_cameras"]
vlad_neighbors = overriden_config["matching_vlad_neighbors"]
vlad_gps_distance = overriden_config["matching_vlad_gps_distance"]
vlad_gps_neighbors = overriden_config["matching_vlad_gps_neighbors"]
vlad_other_cameras = overriden_config["matching_vlad_other_cameras"]
data.init_reference()
reference = data.load_reference()
if not all(map(has_gps_info, exifs.values())):
if gps_neighbors != 0:
logger.warn(
"Not all images have GPS info. " "Disabling matching_gps_neighbors."
)
gps_neighbors = 0
max_distance = 0
graph_rounds = 0
images_ref.sort()
if (
max_distance
== gps_neighbors
== time_neighbors
== order_neighbors
== bow_neighbors
== vlad_neighbors
== graph_rounds
== 0
):
# All pair selection strategies deactivated so we match all pairs
d = set()
t = set()
g = set()
o = set()
b = set()
v = set()
pairs = {sorted_pair(i, j) for i in images_ref for j in images_cand if i != j}
else:
d = match_candidates_by_distance(
images_ref, images_cand, exifs, reference, gps_neighbors, max_distance
)
g = match_candidates_by_graph(
images_ref, images_cand, exifs, reference, graph_rounds
)
t = match_candidates_by_time(images_ref, images_cand, exifs, time_neighbors)
o = match_candidates_by_order(images_ref, images_cand, order_neighbors)
b = match_candidates_with_bow(
data,
images_ref,
images_cand,
exifs,
reference,
bow_neighbors,
bow_gps_distance,
bow_gps_neighbors,
bow_other_cameras,
)
v = match_candidates_with_vlad(
data,
images_ref,
images_cand,
exifs,
reference,
vlad_neighbors,
vlad_gps_distance,
vlad_gps_neighbors,
vlad_other_cameras,
{},
)
pairs = d | g | t | o | set(b) | set(v)
pairs = ordered_pairs(pairs, images_ref)
report = {
"num_pairs_distance": len(d),
"num_pairs_graph": len(g),
"num_pairs_time": len(t),
"num_pairs_order": len(o),
"num_pairs_bow": len(b),
"num_pairs_vlad": len(v),
}
return pairs, report
def bow_distances(
image: str, other_images: Iterable[str], histograms: Dict[str, np.ndarray]
) -> Tuple[str, List[float], List[str]]:
"""Compute BoW-based distance (L1 on histogram of words)
between an image and other images.
"""
if image not in histograms:
return image, [], []
distances = []
other = []
h = histograms[image]
for im2 in other_images:
if im2 != image and im2 in histograms:
h2 = histograms[im2]
distances.append(np.fabs(h - h2).sum())
other.append(im2)
return image, distances, other
def load_histograms(data: DataSetBase, images: Iterable[str]) -> Dict[str, np.ndarray]:
"""Load BoW histograms of given images"""
min_num_feature = 8
histograms = {}
bows = bow.load_bows(data.config)
for im in images:
filtered_words = feature_loader.instance.load_words(data, im, masked=True)
if filtered_words is None:
logger.error("No words in image {}".format(im))
continue
if len(filtered_words) <= min_num_feature:
logger.warning(
"Too few filtered features in image {}: {}".format(
im, len(filtered_words)
)
)
continue
histograms[im] = bows.histogram(filtered_words[:, 0])
return histograms
def vlad_histogram_unwrap_args(
args: Tuple[DataSetBase, str]
) -> Optional[Tuple[str, np.ndarray]]:
"""Helper function for multithreaded VLAD computation.
Returns the image and its descriptor.
"""
data, image = args
vlad_descriptor = vlad.instance.vlad_histogram(data, image)
if vlad_descriptor is not None:
return image, vlad_descriptor
else:
logger.warning(f"Couln't compute VLAD descriptor for image {image}")
return None
def vlad_histograms(images: Iterable[str], data: DataSetBase) -> Dict[str, np.ndarray]:
"""Construct VLAD histograms from the image features.
Returns a dictionary of VLAD vectors for the images.
"""
batch_size = 4
vlads = context.parallel_map(
vlad_histogram_unwrap_args,
[(data, image) for image in images],
data.config["processes"],
batch_size,
)
return {v[0]: v[1] for v in vlads if v}
def pairs_from_neighbors(
image: str,
exifs: Dict[str, Any],
distances: List[float],
order: List[int],
other: List[str],
max_neighbors: int,
) -> Dict[Tuple[str, str], float]:
"""Construct matching pairs given closest ordered neighbors.
Pairs will of form (image, im2), im2 being the closest max_neighbors
given by (order, other) having the same cameras OR the closest max_neighbors
having from any other camera.
"""
same_camera, other_cameras = [], []
for i in order:
im2 = other[i]
d = distances[i]
if exifs[im2]["camera"] == exifs[image]["camera"]:
if len(same_camera) < max_neighbors:
same_camera.append((im2, d))
else:
if len(other_cameras) < max_neighbors:
other_cameras.append((im2, d))
if len(same_camera) + len(other_cameras) >= 2 * max_neighbors:
break
pairs = {}
for im2, d in same_camera + other_cameras:
pairs[tuple(sorted((image, im2)))] = d
return pairs
def ordered_pairs(
pairs: Set[Tuple[str, str]], images_ref: List[str]
) -> List[Tuple[str, str]]:
"""Image pairs that need matching skipping duplicates.
Returns a list of pairs (im1, im2) such that (im1 in images_ref) is true.
"""
per_image = defaultdict(list)
for im1, im2 in pairs:
per_image[im1].append(im2)
per_image[im2].append(im1)
ordered = set()
remaining = set(images_ref)
if len(remaining) > 0:
next_image = remaining.pop()
while next_image:
im1 = next_image
next_image = None
for im2 in per_image[im1]:
if (im2, im1) not in ordered:
ordered.add((im1, im2))
if not next_image and im2 in remaining:
next_image = im2
remaining.remove(im2)
if not next_image and remaining:
next_image = remaining.pop()
return list(ordered)