opensfm/actions/export_colmap.py (444 lines of code) (raw):
# Copyright (c) 2018, ETH Zurich and UNC Chapel Hill.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of ETH Zurich and UNC Chapel Hill nor the names of
# its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Johannes L. Schoenberger (jsch at inf.ethz.ch)
# This script is based on an original implementation by True Price.
import math
import os
import sqlite3
import sys
import tempfile
import typing as t
from struct import pack
import numpy as np
from opensfm import features
from opensfm import matching
from opensfm.dataset import DataSet
I_3 = np.eye(3)
def run_dataset(data: DataSet, binary: bool) -> None:
"""Export reconstruction to COLMAP format."""
export_folder = os.path.join(data.data_path, "colmap_export")
data.io_handler.mkdir_p(export_folder)
database_path = os.path.join(export_folder, "colmap_database.db")
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_database_path = os.path.join(tmp_dir, "colmap_database.db")
images_path = os.path.join(data.data_path, "images")
db = COLMAPDatabase.connect(tmp_database_path)
db.create_tables()
images_map, camera_map = export_cameras(data, db)
features_map = export_features(data, db, images_map)
export_matches(data, db, features_map, images_map)
if data.reconstruction_exists():
export_ini_file(export_folder, database_path, images_path, data.io_handler)
export_cameras_reconstruction(data, export_folder, camera_map, binary)
points_map = export_points_reconstruction(
data, export_folder, images_map, binary
)
export_images_reconstruction(
data,
export_folder,
camera_map,
images_map,
features_map,
points_map,
binary,
)
db.commit()
db.close()
data.io_handler.rm_if_exist(database_path)
with data.io_handler.open(tmp_database_path, "rb") as f:
with data.io_handler.open(database_path, "wb") as fwb:
fwb.write(f.read())
IS_PYTHON3: bool = int(sys.version_info[0]) >= 3
MAX_IMAGE_ID = 2 ** 31 - 1
CREATE_CAMERAS_TABLE = """CREATE TABLE IF NOT EXISTS cameras (
camera_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
model INTEGER NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
params BLOB,
prior_focal_length INTEGER NOT NULL)"""
CREATE_DESCRIPTORS_TABLE = """CREATE TABLE IF NOT EXISTS descriptors (
image_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
FOREIGN KEY(image_id) REFERENCES images(image_id) ON DELETE CASCADE)"""
CREATE_IMAGES_TABLE: str = """CREATE TABLE IF NOT EXISTS images (
image_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL UNIQUE,
camera_id INTEGER NOT NULL,
prior_qw REAL,
prior_qx REAL,
prior_qy REAL,
prior_qz REAL,
prior_tx REAL,
prior_ty REAL,
prior_tz REAL,
CONSTRAINT image_id_check CHECK(image_id >= 0 and image_id < {}),
FOREIGN KEY(camera_id) REFERENCES cameras(camera_id))
""".format(
MAX_IMAGE_ID
)
CREATE_TWO_VIEW_GEOMETRIES_TABLE = """
CREATE TABLE IF NOT EXISTS two_view_geometries (
pair_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
config INTEGER NOT NULL,
F BLOB,
E BLOB,
H BLOB)
"""
CREATE_KEYPOINTS_TABLE = """CREATE TABLE IF NOT EXISTS keypoints (
image_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
FOREIGN KEY(image_id) REFERENCES images(image_id) ON DELETE CASCADE)
"""
CREATE_MATCHES_TABLE = """CREATE TABLE IF NOT EXISTS matches (
pair_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB)"""
CREATE_NAME_INDEX = "CREATE UNIQUE INDEX IF NOT EXISTS index_name ON images(name)"
CREATE_ALL: str = "; ".join(
[
CREATE_CAMERAS_TABLE,
CREATE_IMAGES_TABLE,
CREATE_KEYPOINTS_TABLE,
CREATE_DESCRIPTORS_TABLE,
CREATE_MATCHES_TABLE,
CREATE_TWO_VIEW_GEOMETRIES_TABLE,
CREATE_NAME_INDEX,
]
)
def image_ids_to_pair_id(image_id1, image_id2):
if image_id1 > image_id2:
image_id1, image_id2 = image_id2, image_id1
return image_id1 * MAX_IMAGE_ID + image_id2
def pair_id_to_image_ids(pair_id):
image_id2 = pair_id % MAX_IMAGE_ID
image_id1 = (pair_id - image_id2) // MAX_IMAGE_ID
return image_id1, image_id2
def array_to_blob(array):
if IS_PYTHON3:
return array.tobytes()
else:
return np.getbuffer(array)
def blob_to_array(blob, dtype, shape=(-1,)):
if IS_PYTHON3:
return np.fromstring(blob, dtype=dtype).reshape(*shape)
else:
return np.frombuffer(blob, dtype=dtype).reshape(*shape)
class COLMAPDatabase(sqlite3.Connection):
@staticmethod
def connect(database_path):
return sqlite3.connect(database_path, factory=COLMAPDatabase)
def __init__(self, *args, **kwargs):
super(COLMAPDatabase, self).__init__(*args, **kwargs)
self.create_tables = lambda: self.executescript(CREATE_ALL)
self.create_cameras_table = lambda: self.executescript(CREATE_CAMERAS_TABLE)
self.create_descriptors_table = lambda: self.executescript(
CREATE_DESCRIPTORS_TABLE
)
self.create_images_table = lambda: self.executescript(CREATE_IMAGES_TABLE)
self.create_two_view_geometries_table = lambda: self.executescript(
CREATE_TWO_VIEW_GEOMETRIES_TABLE
)
self.create_keypoints_table = lambda: self.executescript(CREATE_KEYPOINTS_TABLE)
self.create_matches_table = lambda: self.executescript(CREATE_MATCHES_TABLE)
self.create_name_index = lambda: self.executescript(CREATE_NAME_INDEX)
def add_camera(
self, model, width, height, params, prior_focal_length=False, camera_id=None
):
params = np.asarray(params, np.float64)
cursor = self.execute(
"INSERT INTO cameras VALUES (?, ?, ?, ?, ?, ?)",
(
camera_id,
model,
width,
height,
array_to_blob(params),
prior_focal_length,
),
)
return cursor.lastrowid
def add_image(
self, name, camera_id, prior_q=(0, 0, 0, 0), prior_t=(0, 0, 0), image_id=None
):
cursor = self.execute(
"INSERT INTO images VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
image_id,
name,
camera_id,
prior_q[0],
prior_q[1],
prior_q[2],
prior_q[3],
prior_t[0],
prior_t[1],
prior_t[2],
),
)
return cursor.lastrowid
def add_keypoints(self, image_id, keypoints):
assert len(keypoints.shape) == 2
assert keypoints.shape[1] in [2, 4, 6]
keypoints = np.asarray(keypoints, np.float32)
self.execute(
"INSERT INTO keypoints VALUES (?, ?, ?, ?)",
(image_id,) + keypoints.shape + (array_to_blob(keypoints),),
)
def add_descriptors(self, image_id, descriptors):
descriptors = np.ascontiguousarray(descriptors, np.uint8)
self.execute(
"INSERT INTO descriptors VALUES (?, ?, ?, ?)",
(image_id,) + descriptors.shape + (array_to_blob(descriptors),),
)
def add_matches(self, image_id1, image_id2, matches):
assert len(matches.shape) == 2
assert matches.shape[1] == 2
if image_id1 > image_id2:
matches = matches[:, ::-1]
pair_id = image_ids_to_pair_id(image_id1, image_id2)
matches = np.asarray(matches, np.uint32)
self.execute(
"INSERT INTO matches VALUES (?, ?, ?, ?)",
(pair_id,) + matches.shape + (array_to_blob(matches),),
)
def add_two_view_geometry(
self, image_id1, image_id2, matches, F=I_3, E=I_3, H=I_3, config=2
):
assert len(matches.shape) == 2
assert matches.shape[1] == 2
if image_id1 > image_id2:
matches = matches[:, ::-1]
pair_id = image_ids_to_pair_id(image_id1, image_id2)
matches = np.asarray(matches, np.uint32)
F = np.asarray(F, dtype=np.float64)
E = np.asarray(E, dtype=np.float64)
H = np.asarray(H, dtype=np.float64)
self.execute(
"INSERT INTO two_view_geometries VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(pair_id,)
+ matches.shape
+ (
array_to_blob(matches),
config,
array_to_blob(F),
array_to_blob(E),
array_to_blob(H),
),
)
COLMAP_TYPES_MAP = {
"brown": "FULL_OPENCV",
"perspective": "RADIAL",
"fisheye": "RADIAL_FISHEYE",
"fisheye_opencv": "OPENCV_FISHEYE",
}
COLMAP_ID_MAP = {"brown": 6, "perspective": 3, "fisheye": 9, "fisheye_opencv": 5}
def camera_to_colmap_params(camera) -> t.Tuple[float, ...]:
w = camera.width
h = camera.height
normalizer = max(w, h)
f = camera.focal * normalizer
if camera.projection_type in ("perspective", "fisheye"):
k1 = camera.k1
k2 = camera.k2
cx = w * 0.5
cy = h * 0.5
return f, cx, cy, k1, k2
elif camera.projection_type == "brown":
fy = f * camera.aspect_ratio
c_x = w * 0.5 + normalizer * camera.principal_point[0]
c_y = h * 0.5 + normalizer * camera.principal_point[1]
k1 = camera.k1
k2 = camera.k2
k3 = camera.k3
p1 = camera.p1
p2 = camera.p2
return f, fy, c_x, c_y, k1, k2, p1, p2, k3, 0.0, 0.0, 0.0
elif camera.projection_type == "fisheye_opencv":
fy = f * camera.aspect_ratio
cx = w * 0.5 + camera.principal_point[0]
cy = h * 0.5 + camera.principal_point[1]
k1 = camera.k1
k2 = camera.k2
k3 = camera.k3
k4 = camera.k4
return f, fy, cx, cy, k1, k2, k3, k4
else:
raise ValueError("Can't convert {camera.projection_type} to COLMAP")
def export_cameras(data, db):
camera_map = {}
for camera_model, camera in data.load_camera_models().items():
if data.camera_models_overrides_exists():
overrides = data.load_camera_models_overrides()
if camera_model in overrides:
camera = overrides[camera_model]
parameters = camera_to_colmap_params(camera)
camera_id = db.add_camera(
COLMAP_ID_MAP[camera.projection_type],
camera.width,
camera.height,
np.array(parameters),
)
camera_map[camera_model] = camera_id
images_map = {}
for image in data.images():
camera_model = data.load_exif(image)["camera"]
image_id = db.add_image(image, camera_map[camera_model])
images_map[image] = image_id
return images_map, camera_map
def export_features(data, db, images_map):
features_map = {}
for image in data.images():
width = data.load_exif(image)["width"]
height = data.load_exif(image)["height"]
features_data = data.load_features(image)
if not features_data:
continue
feat = features.denormalized_image_coordinates(
features_data.points, width, height
)
features_map[image] = feat
db.add_keypoints(images_map[image], feat)
return features_map
def export_matches(data, db, features_map, images_map) -> None:
matches_per_pair = {}
for image1 in data.images():
matches = data.load_matches(image1)
for image2, image_matches in matches.items():
pair_key = (min(image1, image2), max(image1, image2))
pair_matches = matches_per_pair.setdefault(pair_key, {})
for match in image_matches:
if image1 < image2:
pair_matches.update({(match[0], match[1]): True})
else:
pair_matches.update({(match[1], match[0]): True})
data.config["robust_matching_threshold"] = 8
for pair, matches in matches_per_pair.items():
matches_numpy = np.array([np.array([m[0], m[1]]) for m in matches])
if len(matches_numpy) < 10:
continue
F, inliers = matching.robust_match_fundamental(
features_map[pair[0]], features_map[pair[1]], matches_numpy, data.config
)
if len(inliers) > 10:
db.add_two_view_geometry(
images_map[pair[0]], images_map[pair[1]], inliers, F=F
)
db.add_matches(images_map[pair[0]], images_map[pair[1]], inliers)
def export_cameras_reconstruction(data, path, camera_map, binary: bool=False) -> None:
reconstructions = data.load_reconstruction()
cameras = {}
for reconstruction in reconstructions:
for camera_id, camera in reconstruction.cameras.items():
cameras[camera_id] = camera
if binary:
fout = data.io_handler.open(os.path.join(path, "cameras.bin"), "wb")
fout.write(pack("<Q", len(cameras)))
else:
fout = data.io_handler.open_wt(os.path.join(path, "cameras.txt"))
for camera_id, camera in cameras.items():
colmap_id = camera_map[camera_id]
colmap_type = COLMAP_TYPES_MAP[camera.projection_type]
w = camera.width
h = camera.height
params = camera_to_colmap_params(camera)
if binary:
fout.write(pack("<2i", colmap_id, COLMAP_ID_MAP[camera.projection_type]))
fout.write(pack("<2Q", w, h))
fout.write(pack(f"<{len(params)}d", *params))
else:
str_out = "%d %s %d %d"
for _param in params:
str_out += " %f"
str_out += "\n"
fout.write(str_out % (colmap_id, colmap_type, w, h, *params))
fout.close()
def export_images_reconstruction(
data, path, camera_map, images_map, features_map, points_map, binary: bool=False
) -> None:
reconstructions = data.load_reconstruction()
tracks_manager = data.load_tracks_manager()
if binary:
fout = data.io_handler.open(os.path.join(path, "images.bin"), "wb")
n_ims = 0
for reconstruction in reconstructions:
n_ims += len(reconstruction.shots)
fout.write(pack("<Q", n_ims))
else:
fout = data.io_handler.open_wt(os.path.join(path, "images.txt"))
for reconstruction in reconstructions:
for shot_id, shot in reconstruction.shots.items():
colmap_camera_id = camera_map[shot.camera.id]
colmap_shot_id = images_map[shot_id]
t = shot.pose.translation
q = angle_axis_to_quaternion(shot.pose.rotation)
if binary:
fout.write(pack("<I", colmap_shot_id))
fout.write(pack("<7d", *(list(q) + list(t))))
fout.write(pack("<I", colmap_camera_id))
for char in shot_id:
fout.write(pack("<c", char.encode("utf-8")))
fout.write(pack("<c", b"\x00"))
format_line = "%d %f %f %f %f %f %f %f %d %s\n"
format_tuple = [
colmap_shot_id,
q[0],
q[1],
q[2],
q[3],
t[0],
t[1],
t[2],
colmap_camera_id,
shot_id,
]
point_per_feat = {
obs.id: k
for k, obs in tracks_manager.get_shot_observations(shot_id).items()
}
points_tuple = []
for feature_id in range(len(features_map[shot_id])):
colmap_point_id = -1
if feature_id in point_per_feat:
point_id = point_per_feat[feature_id]
if point_id in points_map:
colmap_point_id = points_map[point_id]
if colmap_point_id != -1:
x, y = features_map[shot_id][feature_id]
format_line += "%f %f %d "
points_tuple += [x, y, colmap_point_id]
format_line += "\n"
if binary:
fout.write(pack("<Q", len(points_tuple) // 3))
for i in range(0, len(points_tuple), 3):
x, y, colmap_point_id = points_tuple[i : i + 3]
fout.write(pack("<2d", x, y))
fout.write(pack("<Q", colmap_point_id))
else:
fout.write(format_line % tuple(format_tuple + points_tuple))
fout.close()
def export_points_reconstruction(data, path, images_map, binary: bool=False):
reconstructions = data.load_reconstruction()
tracks_manager = data.load_tracks_manager()
points_map = {}
if binary:
fout = data.io_handler.open(os.path.join(path, "points3D.bin"), "wb")
n_points = 0
for reconstruction in reconstructions:
n_points += len(reconstruction.points)
fout.write(pack("<Q", n_points))
else:
fout = data.io_handler.open_wt(os.path.join(path, "points3D.txt"))
i = 0
for reconstruction in reconstructions:
for point in reconstruction.points.values():
c = point.coordinates
cl = point.color
format_line = "%d %f %f %f %d %d %d %f "
format_tuple = [
int(i),
c[0],
c[1],
c[2],
int(cl[0]),
int(cl[1]),
int(cl[2]),
0.0,
]
if binary:
fout.write(pack("<Q", int(i)))
fout.write(pack("<3d", c[0], c[1], c[2])) # Position
fout.write(pack("<3B", *[int(i) for i in cl])) # Color
fout.write(pack("<d", 0.0)) # Error
track_tuple = []
for image, obs in tracks_manager.get_track_observations(point.id).items():
if image not in reconstruction.shots:
continue
format_line += "%d %d "
track_tuple += [images_map[image], obs.id]
format_line += "\n"
if binary:
fout.write(pack("<Q", len(track_tuple) // 2)) # Track length
for el in track_tuple:
fout.write(pack("<i", el)) # Track
else:
fout.write(format_line % tuple(format_tuple + track_tuple))
points_map[point.id] = i
i += 1
fout.close()
return points_map
def angle_axis_to_quaternion(angle_axis):
angle = np.linalg.norm(angle_axis)
x = angle_axis[0] / angle
y = angle_axis[1] / angle
z = angle_axis[2] / angle
qw = math.cos(angle / 2.0)
qx = x * math.sqrt(1 - qw * qw)
qy = y * math.sqrt(1 - qw * qw)
qz = z * math.sqrt(1 - qw * qw)
return [qw, qx, qy, qz]
def export_ini_file(path, db_path, images_path, io_handler) -> None:
with io_handler.open_wt(os.path.join(path, "project.ini")) as fout:
fout.write("log_to_stderr=false\nlog_level=2\n")
fout.write("database_path=%s\n" % db_path)
fout.write("image_path=%s\n" % images_path)