opensfm/multiview.py (427 lines of code) (raw):

import math import random from typing import Dict, Any, Tuple, Optional, List import cv2 import numpy as np from opensfm import pygeometry, pyrobust, transformations as tf, pymap def nullspace(A: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Compute the null space of A. Return the smallest singular value and the corresponding vector. """ u, s, vh = np.linalg.svd(A) return s[-1], vh[-1] def homogeneous(x: np.ndarray) -> np.ndarray: """Add a column of ones to x.""" s = x.shape[:-1] + (1,) return np.hstack((x, np.ones(s))) def homogeneous_vec(x: np.ndarray) -> np.ndarray: """Add a column of zeros to x.""" s = x.shape[:-1] + (1,) return np.hstack((x, np.zeros(s))) def euclidean(x: np.ndarray) -> np.ndarray: """Divide by last column and drop it.""" return x[..., :-1] / x[..., -1:] def cross_product_matrix(x: np.ndarray) -> np.ndarray: """Return the matrix representation of x's cross product""" return np.array([[0, -x[2], x[1]], [x[2], 0, -x[0]], [-x[1], x[0], 0]]) def P_from_KRt(K: np.ndarray, R: np.ndarray, t: np.ndarray) -> np.ndarray: """P = K[R|t].""" P = np.empty((3, 4)) P[:, :3] = np.dot(K, R) P[:, 3] = np.dot(K, t) return P def KRt_from_P(P: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Factorize the camera matrix into K,R,t as P = K[R|t]. >>> K = np.array([[1, 2, 3], ... [0, 4, 5], ... [0, 0, 1]]) >>> R = np.array([[ 0.57313786, -0.60900664, 0.54829181], ... [ 0.74034884, 0.6716445 , -0.02787928], ... [-0.35127851, 0.42190588, 0.83582225]]) >>> t = np.array([1, 2, 3]) >>> P = P_from_KRt(K, R, t) >>> KK, RR, tt = KRt_from_P(P) >>> np.allclose(K, KK) True >>> np.allclose(R, RR) True >>> np.allclose(t, tt) True """ K, R = rq(P[:, :3]) T = np.diag(np.sign(np.diag(K))) # ensure K has positive diagonal K = np.dot(K, T) R = np.dot(T, R) t = np.linalg.solve(K, P[:, 3]) if np.linalg.det(R) < 0: # ensure det(R) = 1 R = -R t = -t K /= K[2, 2] # normalise K return K, R, t def rq(A: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Decompose a matrix into a triangular times rotation. (from PCV) >>> Q = np.array([[ 0.57313786, -0.60900664, 0.54829181], ... [ 0.74034884, 0.6716445 , -0.02787928], ... [-0.35127851, 0.42190588, 0.83582225]]) >>> R = np.array([[1, 2, 3], ... [0, 4, 5], ... [0, 0, 1]]) >>> r, q = rq(R.dot(Q)) >>> np.allclose(r.dot(q), R.dot(Q)) True >>> np.allclose(abs(np.linalg.det(q)), 1.0) True >>> np.allclose(r[1,0], 0) and np.allclose(r[2,0], 0) and np.allclose(r[2,1], 0) True """ Q, R = np.linalg.qr(np.flipud(A).T) R = np.flipud(R.T) Q = Q.T return R[:, ::-1], Q[::-1, :] def vector_angle(u: np.ndarray, v: np.ndarray) -> float: """Angle between two vectors. >>> u = [ 0.99500417, -0.33333333, -0.09983342] >>> v = [ -0.99500417, +0.33333333, +0.09983342] >>> vector_angle(u, u) 0.0 >>> np.isclose(vector_angle(u, v), np.pi) True """ cos = np.dot(u, v) / math.sqrt(np.dot(u, u) * np.dot(v, v)) cos = np.clip(cos, -1, 1) return math.acos(cos) def decompose_similarity_transform( T: np.ndarray, ) -> Tuple[float, np.ndarray, np.ndarray]: """Decompose the similarity transform to scale, rotation and translation""" m, n = T.shape[0:2] assert m == n A, b = T[: (m - 1), : (m - 1)], T[: (m - 1), (m - 1)] s = np.linalg.det(A) ** (1.0 / (m - 1)) return s, A / s, b def ransac_max_iterations( kernel: Any, inliers: np.ndarray, failure_probability: float ) -> float: if len(inliers) >= kernel.num_samples(): return 0 inlier_ratio = float(len(inliers)) / kernel.num_samples() n = kernel.required_samples return math.log(failure_probability) / math.log(1.0 - inlier_ratio ** n) TRansacSolution = Tuple[np.ndarray, np.ndarray, float] def ransac(kernel: Any, threshold: float) -> TRansacSolution: """Robustly fit a model to data. >>> x = np.array([1., 2., 3.]) >>> y = np.array([2., 4., 7.]) >>> kernel = TestLinearKernel(x, y) >>> model, inliers, error = ransac(kernel, 0.1) >>> np.allclose(model, 2.0) True >>> inliers array([0, 1]) >>> np.allclose(error, 0.1) True """ max_iterations = 1000 best_error = float("inf") best_model = None best_inliers = [] i = 0 while i < max_iterations: try: samples = kernel.sampling() except AttributeError: samples = random.sample( range(kernel.num_samples()), kernel.required_samples ) models = kernel.fit(samples) for model in models: errors = kernel.evaluate(model) inliers = np.flatnonzero(np.fabs(errors) < threshold) error = np.fabs(errors).clip(0, threshold).sum() if len(inliers) and error < best_error: best_error = error best_model = model best_inliers = inliers max_iterations = min( max_iterations, ransac_max_iterations(kernel, best_inliers, 0.01) ) i += 1 return best_model, best_inliers, best_error class TestLinearKernel: """A kernel for the model y = a * x. >>> x = np.array([1., 2., 3.]) >>> y = np.array([2., 4., 7.]) >>> kernel = TestLinearKernel(x, y) >>> models = kernel.fit([0]) >>> models [2.0] >>> errors = kernel.evaluate(models[0]) >>> np.allclose(errors, [0., 0., 1.]) True """ required_samples = 1 def __init__(self, x, y): self.x = x self.y = y def num_samples(self): return len(self.x) def fit(self, samples): x = self.x[samples[0]] y = self.y[samples[0]] return [y / x] def evaluate(self, model): return self.y - model * self.x class PlaneKernel: """ A kernel for estimating plane from on-plane points and vectors """ def __init__( self, points, vectors, verticals, point_threshold=1.0, vector_threshold=5.0 ): self.points = points self.vectors = vectors self.verticals = verticals self.required_samples = 3 self.point_threshold = point_threshold self.vector_threshold = vector_threshold def num_samples(self): return len(self.points) def sampling(self): samples = {} if len(self.vectors) > 0: samples["points"] = self.points[ random.sample(range(len(self.points)), 2), : ] samples["vectors"] = [ self.vectors[i] for i in random.sample(range(len(self.vectors)), 1) ] else: samples["points"] = self.points[ :, random.sample(range(len(self.points)), 3) ] samples["vectors"] = None return samples def fit(self, samples): model = fit_plane(samples["points"], samples["vectors"], self.verticals) return [model] def evaluate(self, model): # only evaluate on points normal = model[0:3] normal_norm = np.linalg.norm(normal) + 1e-10 point_error = np.abs(model.T.dot(homogeneous(self.points).T)) / normal_norm vectors = np.array(self.vectors) vector_norm = np.sum(vectors * vectors, axis=1) vectors = (vectors.T / vector_norm).T vector_error = abs( np.rad2deg(abs(np.arccos(vectors.dot(normal) / normal_norm))) - 90 ) vector_error[vector_error < self.vector_threshold] = 0.0 vector_error[vector_error >= self.vector_threshold] = self.point_threshold + 0.1 point_error[point_error < self.point_threshold] = 0.0 point_error[point_error >= self.point_threshold] = self.point_threshold + 0.1 errors = np.hstack((point_error, vector_error)) return errors def fit_plane_ransac( points: np.ndarray, vectors: np.ndarray, verticals: np.ndarray, point_threshold: float = 1.2, vector_threshold: float = 5.0, ) -> TRansacSolution: vectors = np.array([v / math.pi * 180.0 for v in vectors]) kernel = PlaneKernel( points - points.mean(axis=0), vectors, verticals, point_threshold, vector_threshold, ) p, inliers, error = ransac(kernel, point_threshold) num_point = points.shape[0] points_inliers = points[inliers[inliers < num_point], :] vectors_inliers = np.array( [vectors[i - num_point] for i in inliers[inliers >= num_point]] ) p = fit_plane( points_inliers - points_inliers.mean(axis=0), vectors_inliers, verticals ) return p, inliers, error def fit_plane( points: np.ndarray, vectors: Optional[np.ndarray], verticals: Optional[np.ndarray] ) -> np.ndarray: """Estimate a plane fron on-plane points and vectors. >>> x = [[0,0,0], [1,0,0], [0,1,0]] >>> p = fit_plane(x, None, None) >>> np.allclose(p, [0,0,1,0]) or np.allclose(p, [0,0,-1,0]) True >>> x = [[0,0,0], [0,1,0]] >>> v = [[1,0,0]] >>> p = fit_plane(x, v, None) >>> np.allclose(p, [0,0,1,0]) or np.allclose(p, [0,0,-1,0]) True >>> vert = [[0,0,1]] >>> p = fit_plane(x, v, vert) >>> np.allclose(p, [0,0,1,0]) True """ # (x 1) p = 0 # (v 0) p = 0 points = np.array(points) s = 1.0 / max(1e-8, points.std()) # Normalize the scale to improve conditioning. x = homogeneous(s * points) if vectors is not None and len(vectors) > 0: v = homogeneous_vec(s * np.array(vectors)) A = np.vstack((x, v)) else: A = x _, p = nullspace(A) p[3] /= s if np.allclose(p[:3], [0, 0, 0]): return np.array([0.0, 0.0, 1.0, 0]) # Use verticals to decide the sign of p if verticals is not None and len(verticals) > 0: d = 0 for vertical in verticals: d += p[:3].dot(vertical) p *= np.sign(d) return p def plane_horizontalling_rotation(p: np.ndarray) -> Optional[np.ndarray]: """Compute a rotation that brings p to z=0 >>> p = [1.0, 2.0, 3.0] >>> R = plane_horizontalling_rotation(p) >>> np.allclose(R.dot(p), [0, 0, np.linalg.norm(p)]) True >>> p = [0, 0, 1.0] >>> R = plane_horizontalling_rotation(p) >>> np.allclose(R.dot(p), [0, 0, np.linalg.norm(p)]) True >>> p = [0, 0, -1.0] >>> R = plane_horizontalling_rotation(p) >>> np.allclose(R.dot(p), [0, 0, np.linalg.norm(p)]) True >>> p = [1e-14, 1e-14, -1.0] >>> R = plane_horizontalling_rotation(p) >>> np.allclose(R.dot(p), [0, 0, np.linalg.norm(p)]) True """ v0 = p[:3] v1 = np.array([0.0, 0.0, 1.0]) angle = tf.angle_between_vectors(v0, v1) axis = tf.vector_product(v0, v1) if np.linalg.norm(axis) > 0: return tf.rotation_matrix(angle, axis)[:3, :3] elif angle < 1.0: return np.eye(3) elif angle > 3.0: return np.diag([1, -1, -1]) return None def fit_similarity_transform( p1: np.ndarray, p2: np.ndarray, max_iterations: int = 1000, threshold: float = 1 ) -> Tuple[np.ndarray, np.ndarray]: """Fit a similarity transform T such as p2 = T . p1 between two points sets p1 and p2""" # TODO (Yubin): adapt to RANSAC class num_points, dim = p1.shape[0:2] assert p1.shape[0] == p2.shape[0] best_inliers = [] best_T = np.array((3, 4)) for _ in range(max_iterations): rnd = np.random.permutation(num_points) rnd = rnd[0:dim] T = tf.affine_matrix_from_points(p1[rnd, :].T, p2[rnd, :].T, shear=False) p1h = homogeneous(p1) p2h = homogeneous(p2) errors = np.sqrt(np.sum((p2h.T - np.dot(T, p1h.T)) ** 2, axis=0)) inliers = np.argwhere(errors < threshold)[:, 0] if len(inliers) >= len(best_inliers): best_T = T.copy() best_inliers = np.argwhere(errors < threshold)[:, 0] # Estimate similarity transform with inliers if len(best_inliers) > dim + 3: best_T = tf.affine_matrix_from_points( p1[best_inliers, :].T, p2[best_inliers, :].T, shear=False ) errors = np.sqrt(np.sum((p2h.T - np.dot(best_T, p1h.T)) ** 2, axis=0)) best_inliers = np.argwhere(errors < threshold)[:, 0] return best_T, best_inliers def K_from_camera(camera: Dict[str, Any]) -> np.ndarray: f = float(camera["focal"]) return np.array([[f, 0.0, 0.0], [0.0, f, 0.0], [0.0, 0.0, 1.0]]) def focal_from_homography(H: np.ndarray) -> np.ndarray: """Solve for w = H w H^t, with w = diag(a, a, b) >>> K = np.diag([0.8, 0.8, 1]) >>> R = cv2.Rodrigues(np.array([0.3, 0, 0]))[0] >>> H = K.dot(R).dot(np.linalg.inv(K)) >>> f = focal_from_homography(3 * H) >>> np.allclose(f, 0.8) True """ H = H / np.linalg.det(H) ** (1.0 / 3.0) A = np.array( [ [H[0, 0] * H[0, 0] + H[0, 1] * H[0, 1] - 1, H[0, 2] * H[0, 2]], [H[0, 0] * H[1, 0] + H[0, 1] * H[1, 1], H[0, 2] * H[1, 2]], [H[0, 0] * H[2, 0] + H[0, 1] * H[2, 1], H[0, 2] * H[2, 2]], [H[1, 0] * H[1, 0] + H[1, 1] * H[1, 1] - 1, H[1, 2] * H[1, 2]], [H[1, 0] * H[2, 0] + H[1, 1] * H[2, 1], H[1, 2] * H[2, 2]], [H[2, 0] * H[2, 0] + H[2, 1] * H[2, 1], H[2, 2] * H[2, 2] - 1], ] ) _, (a, b) = nullspace(A) focal = np.sqrt(a / b) return focal def R_from_homography( H: np.ndarray, f1: np.ndarray, f2: np.ndarray ) -> Optional[np.ndarray]: K1 = np.diag([f1, f1, 1]) K2 = np.diag([f2, f2, 1]) K2inv = np.linalg.inv(K2) R = K2inv.dot(H).dot(K1) R = project_to_rotation_matrix(R) return R def project_to_rotation_matrix(A: np.ndarray) -> Optional[np.ndarray]: try: u, d, vt = np.linalg.svd(A) except np.linalg.linalg.LinAlgError: return None return u.dot(vt) def camera_up_vector(rotation_matrix: np.ndarray) -> np.ndarray: """Unit vector pointing to zenit in camera coords. :param rotation: camera pose rotation """ return rotation_matrix[:, 2] def camera_compass_angle(rotation_matrix: np.ndarray) -> float: """Compass angle of a camera Angle between world's Y axis and camera's Z axis projected onto the XY world plane. :param rotation: camera pose rotation """ z = rotation_matrix[2, :] # Camera's Z axis in world coordinates angle = np.arctan2(z[0], z[1]) return np.degrees(angle) def rotation_matrix_from_up_vector_and_compass( up_vector: List[float], compass_angle: float ) -> np.ndarray: """Camera rotation given up_vector and compass. >>> d = [1, 2, 3] >>> angle = -123 >>> R = rotation_matrix_from_up_vector_and_compass(d, angle) >>> np.allclose(np.linalg.det(R), 1.0) True >>> up = camera_up_vector(R) >>> np.allclose(d / np.linalg.norm(d), up) True >>> np.allclose(camera_compass_angle(R), angle) True >>> d = [0, 0, 1] >>> angle = 123 >>> R = rotation_matrix_from_up_vector_and_compass(d, angle) >>> np.allclose(np.linalg.det(R), 1.0) True >>> up = camera_up_vector(R) >>> np.allclose(d / np.linalg.norm(d), up) True """ r3 = np.array(up_vector) / np.linalg.norm(up_vector) ez = np.array([0.0, 0.0, 1.0]) r2 = ez - np.dot(ez, r3) * r3 r2n = np.linalg.norm(r2) if r2n > 1e-8: r2 /= r2n r1 = np.cross(r2, r3) else: # We are looking to nadir or zenith r1 = np.array([1.0, 0.0, 0.0]) r2 = np.cross(r3, r1) compass_rotation = cv2.Rodrigues(np.radians([0.0, 0.0, compass_angle]))[0] return np.column_stack([r1, r2, r3]).dot(compass_rotation) def motion_from_plane_homography( H: np.ndarray, ) -> Optional[List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]]: """Compute candidate camera motions from a plane-induced homography. Returns up to 8 motions. The homography is assumed to be in normalized camera coordinates. Uses the method of [Faugueras and Lustman 1988] [Faugueras and Lustman 1988] Faugeras, Olivier, and F. Lustman. “Motion and Structure from Motion in a Piecewise Planar Environment.” Report. INRIA, June 1988. https://hal.inria.fr/inria-00075698/document """ u, l, vh = np.linalg.svd(H) d1, d2, d3 = l s = np.linalg.det(u) * np.linalg.det(vh) # Skip the cases where some singular values are nearly equal if d1 / d2 < 1.0001 or d2 / d3 < 1.0001: return None abs_x1 = np.sqrt((d1 ** 2 - d2 ** 2) / (d1 ** 2 - d3 ** 2)) abs_x3 = np.sqrt((d2 ** 2 - d3 ** 2) / (d1 ** 2 - d3 ** 2)) possible_x1_x3 = [ (abs_x1, abs_x3), (abs_x1, -abs_x3), (-abs_x1, abs_x3), (-abs_x1, -abs_x3), ] solutions = [] # Case d' > 0 for x1, x3 in possible_x1_x3: sin_theta = (d1 - d3) * x1 * x3 / d2 cos_theta = (d1 * x3 ** 2 + d3 * x1 ** 2) / d2 Rp = np.array( [[cos_theta, 0, -sin_theta], [0, 1, 0], [sin_theta, 0, cos_theta]] ) tp = (d1 - d3) * np.array([x1, 0, -x3]) np_ = np.array([x1, 0, x3]) R = s * np.dot(np.dot(u, Rp), vh) t = np.dot(u, tp) n = -np.dot(vh.T, np_) d = s * d2 solutions.append((R, t, n, d)) # Case d' < 0 for x1, x3 in possible_x1_x3: sin_phi = (d1 + d3) * x1 * x3 / d2 cos_phi = (d3 * x1 ** 2 - d1 * x3 ** 2) / d2 Rp = np.array([[cos_phi, 0, sin_phi], [0, -1, 0], [sin_phi, 0, -cos_phi]]) tp = (d1 + d3) * np.array([x1, 0, x3]) np_ = np.array([x1, 0, x3]) R = s * np.dot(np.dot(u, Rp), vh) t = np.dot(u, tp) n = -np.dot(vh.T, np_) d = -s * d2 solutions.append((R, t, n, d)) return solutions def absolute_pose_known_rotation_ransac( bs: np.ndarray, Xs: np.ndarray, threshold: float, iterations: int, probability: float, ) -> np.ndarray: params = pyrobust.RobustEstimatorParams() params.iterations = iterations result = pyrobust.ransac_absolute_pose_known_rotation( bs, Xs, threshold, params, pyrobust.RansacType.RANSAC ) t = -result.lo_model.copy() R = np.identity(3) return np.concatenate((R, [[t[0]], [t[1]], [t[2]]]), axis=1) def absolute_pose_ransac( bs: np.ndarray, Xs: np.ndarray, threshold: float, iterations: int, probability: float, ) -> np.ndarray: params = pyrobust.RobustEstimatorParams() params.iterations = iterations result = pyrobust.ransac_absolute_pose( bs, Xs, threshold, params, pyrobust.RansacType.RANSAC ) Rt = result.lo_model.copy() R, t = Rt[:3, :3].copy(), Rt[:, 3].copy() Rt[:3, :3] = R.T Rt[:, 3] = -R.T.dot(t) return Rt def relative_pose_ransac( b1: np.ndarray, b2: np.ndarray, threshold: float, iterations: int, probability: float, ) -> np.ndarray: params = pyrobust.RobustEstimatorParams() params.iterations = iterations result = pyrobust.ransac_relative_pose( b1, b2, threshold, params, pyrobust.RansacType.RANSAC ) Rt = result.lo_model.copy() R, t = Rt[:3, :3].copy(), Rt[:, 3].copy() Rt[:3, :3] = R.T Rt[:, 3] = -R.T.dot(t) return Rt def relative_pose_ransac_rotation_only( b1: np.ndarray, b2: np.ndarray, threshold: float, iterations: int, probability: float, ) -> np.ndarray: params = pyrobust.RobustEstimatorParams() params.iterations = iterations result = pyrobust.ransac_relative_rotation( b1, b2, threshold, params, pyrobust.RansacType.RANSAC ) return result.lo_model.T def relative_pose_optimize_nonlinear( b1: np.ndarray, b2: np.ndarray, t: np.ndarray, R: np.ndarray, iterations: int ) -> np.ndarray: Rt = np.zeros((3, 4)) Rt[:3, :3] = R.T Rt[:, 3] = -R.T.dot(t) Rt_refined = pygeometry.relative_pose_refinement(Rt, b1, b2, iterations) R, t = Rt_refined[:3, :3].copy(), Rt_refined[:, 3].copy() Rt[:3, :3] = R.T Rt[:, 3] = -R.T.dot(t) return Rt def triangulate_gcp( point: pymap.GroundControlPoint, shots: Dict[str, pymap.Shot], reproj_threshold: float, min_ray_angle_degrees: float, ) -> Optional[np.ndarray]: """Compute the reconstructed position of a GCP from observations.""" os, bs, ids = [], [], [] for observation in point.observations: shot_id = observation.shot_id if shot_id in shots: shot = shots[shot_id] os.append(shot.pose.get_origin()) x = observation.projection b = shot.camera.pixel_bearing(np.array(x)) r = shot.pose.get_rotation_matrix().T bs.append(r.dot(b)) ids.append(shot_id) if len(os) >= 2: thresholds = len(os) * [reproj_threshold] valid_triangulation, X = pygeometry.triangulate_bearings_midpoint( np.asarray(os), np.asarray(bs), thresholds, np.radians(min_ray_angle_degrees), ) if valid_triangulation: return X return None