opensfm/io.py (1,120 lines of code) (raw):
import json
import logging
import os
import shutil
from abc import ABC, abstractmethod
from typing import Dict, Any, Iterable, List, IO, Tuple, TextIO, Optional
import cv2
import numpy as np
import pyproj
from opensfm import context, features, geo, pygeometry, pymap, types
from PIL import Image
logger = logging.getLogger(__name__)
def camera_from_json(key: str, obj: Dict[str, Any]) -> pygeometry.Camera:
"""
Read camera from a json object
"""
camera = None
pt = obj.get("projection_type", "perspective")
if pt == "perspective":
camera = pygeometry.Camera.create_perspective(
obj["focal"], obj.get("k1", 0.0), obj.get("k2", 0.0)
)
elif pt == "brown":
camera = pygeometry.Camera.create_brown(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
np.array(
[
obj.get("k1", 0.0),
obj.get("k2", 0.0),
obj.get("k3", 0.0),
obj.get("p1", 0.0),
obj.get("p2", 0.0),
]
),
)
elif pt == "fisheye":
camera = pygeometry.Camera.create_fisheye(
obj["focal"], obj.get("k1", 0.0), obj.get("k2", 0.0)
)
elif pt == "fisheye_opencv":
camera = pygeometry.Camera.create_fisheye_opencv(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
np.array(
[
obj.get("k1", 0.0),
obj.get("k2", 0.0),
obj.get("k3", 0.0),
obj.get("k4", 0.0),
]
),
)
elif pt == "fisheye62":
camera = pygeometry.Camera.create_fisheye62(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
np.array(
[
obj.get("k1", 0.0),
obj.get("k2", 0.0),
obj.get("k3", 0.0),
obj.get("k4", 0.0),
obj.get("k5", 0.0),
obj.get("k6", 0.0),
obj.get("p1", 0.0),
obj.get("p2", 0.0),
]
),
)
elif pt == "fisheye624":
camera = pygeometry.Camera.create_fisheye624(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
np.array(
[
obj.get("k1", 0.0),
obj.get("k2", 0.0),
obj.get("k3", 0.0),
obj.get("k4", 0.0),
obj.get("k5", 0.0),
obj.get("k6", 0.0),
obj.get("p1", 0.0),
obj.get("p2", 0.0),
obj.get("s0", 0.0),
obj.get("s1", 0.0),
obj.get("s2", 0.0),
obj.get("s3", 0.0),
]
),
)
elif pt == "radial":
camera = pygeometry.Camera.create_radial(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
np.array(
[
obj.get("k1", 0.0),
obj.get("k2", 0.0),
]
),
)
elif pt == "simple_radial":
camera = pygeometry.Camera.create_simple_radial(
obj["focal_x"],
obj["focal_y"] / obj["focal_x"],
np.array([obj.get("c_x", 0.0), obj.get("c_y", 0.0)]),
obj.get("k1", 0.0),
)
elif pt == "dual":
camera = pygeometry.Camera.create_dual(
obj.get("transition", 0.5),
obj["focal"],
obj.get("k1", 0.0),
obj.get("k2", 0.0),
)
elif pygeometry.Camera.is_panorama(pt):
camera = pygeometry.Camera.create_spherical()
else:
raise NotImplementedError
camera.id = key
camera.width = int(obj.get("width", 0))
camera.height = int(obj.get("height", 0))
return camera
def pose_from_json(obj: Dict[str, Any]) -> pygeometry.Pose:
pose = pygeometry.Pose()
pose.rotation = obj["rotation"]
if "translation" in obj:
pose.translation = obj["translation"]
return pose
def bias_from_json(obj: Dict[str, Any]) -> pygeometry.Similarity:
return pygeometry.Similarity(obj["rotation"], obj["translation"], obj["scale"])
def assign_shot_attributes(obj: Dict[str, Any], shot: pymap.Shot) -> None:
shot.metadata = json_to_pymap_metadata(obj)
if "scale" in obj:
shot.scale = obj["scale"]
if "covariance" in obj:
shot.covariance = np.array(obj["covariance"])
if "merge_cc" in obj:
shot.merge_cc = obj["merge_cc"]
if "vertices" in obj and "faces" in obj:
shot.mesh.vertices = obj["vertices"]
shot.mesh.faces = obj["faces"]
def shot_in_reconstruction_from_json(
reconstruction: types.Reconstruction,
key: str,
obj: Dict[str, Any],
rig_instance_id: Optional[str] = None,
rig_camera_id: Optional[str] = None,
is_pano_shot: bool = False,
) -> pymap.Shot:
"""
Read shot from a json object and append it to a reconstruction
"""
pose = pose_from_json(obj)
if is_pano_shot:
shot = reconstruction.create_pano_shot(key, obj["camera"], pose)
else:
shot = reconstruction.create_shot(
key, obj["camera"], pose, rig_camera_id, rig_instance_id
)
assign_shot_attributes(obj, shot)
return shot
def single_shot_from_json(
key: str, obj: Dict[str, Any], camera: pygeometry.Camera
) -> pymap.Shot:
"""
Read shot from a json object
"""
pose = pose_from_json(obj)
shot = pymap.Shot(key, camera, pose)
assign_shot_attributes(obj, shot)
return shot
def point_from_json(
reconstruction: types.Reconstruction, key: str, obj: Dict[str, Any]
) -> pymap.Landmark:
"""
Read a point from a json object
"""
point = reconstruction.create_point(key, obj["coordinates"])
point.color = obj["color"]
return point
def rig_camera_from_json(key: str, obj: Dict[str, Any]) -> pymap.RigCamera:
"""
Read a rig cameras from a json object
"""
pose = pygeometry.Pose()
pose.rotation = obj["rotation"]
pose.translation = obj["translation"]
rig_camera = pymap.RigCamera(pose, key)
return rig_camera
def rig_cameras_from_json(obj: Dict[str, Any]) -> Dict[str, pymap.RigCamera]:
"""
Read rig cameras from a json object
"""
rig_cameras = {}
for key, value in obj.items():
rig_cameras[key] = rig_camera_from_json(key, value)
return rig_cameras
def rig_instance_from_json(
reconstruction: types.Reconstruction, instance_id: str, obj: Dict[str, Any]
) -> None:
"""
Read any rig instance from a json shot object
"""
reconstruction.add_rig_instance(pymap.RigInstance(instance_id))
pose = pygeometry.Pose()
pose.rotation = obj["rotation"]
pose.translation = obj["translation"]
reconstruction.rig_instances[instance_id].pose = pose
def rig_instance_camera_per_shot(obj: Dict[str, Any]) -> Dict[str, Tuple[str, str]]:
"""
Given JSON root data, return (rig_instance_id, rig_camera_id) per shot.
"""
panoshots = set(obj["pano_shots"].keys()) if "pano_shots" in obj else {}
rig_shots = {}
if "rig_instances" in obj:
rig_shots = {
s_key: (i_key, c_key)
for i_key, ri in obj["rig_instances"].items()
for s_key, c_key in ri["rig_camera_ids"].items()
if s_key not in panoshots
}
return rig_shots
def reconstruction_from_json(obj: Dict[str, Any]) -> types.Reconstruction:
"""
Read a reconstruction from a json object
"""
reconstruction = types.Reconstruction()
# Extract cameras
for key, value in obj["cameras"].items():
camera = camera_from_json(key, value)
reconstruction.add_camera(camera)
# Extract camera biases
if "biases" in obj:
for key, value in obj["biases"].items():
transform = bias_from_json(value)
reconstruction.set_bias(key, transform)
# Extract rig models
if "rig_cameras" in obj:
for key, value in obj["rig_cameras"].items():
reconstruction.add_rig_camera(rig_camera_from_json(key, value))
# Extract rig instances from shots
if "rig_instances" in obj:
for key, value in obj["rig_instances"].items():
rig_instance_from_json(reconstruction, key, value)
# Extract shots
rig_shots = rig_instance_camera_per_shot(obj)
for key, value in obj["shots"].items():
shot_in_reconstruction_from_json(
reconstruction,
key,
value,
rig_camera_id=rig_shots[key][1] if key in rig_shots else None,
rig_instance_id=rig_shots[key][0] if key in rig_shots else None,
is_pano_shot=False,
)
# Extract points
if "points" in obj:
for key, value in obj["points"].items():
point_from_json(reconstruction, key, value)
# Extract pano_shots
if "pano_shots" in obj:
for key, value in obj["pano_shots"].items():
shot_in_reconstruction_from_json(
reconstruction, key, value, is_pano_shot=True
)
# Extract reference topocentric frame
if "reference_lla" in obj:
lla = obj["reference_lla"]
reconstruction.reference = geo.TopocentricConverter(
lla["latitude"], lla["longitude"], lla["altitude"]
)
return reconstruction
def reconstructions_from_json(obj: List[Dict[str, Any]]) -> List[types.Reconstruction]:
"""
Read all reconstructions from a json object
"""
return [reconstruction_from_json(i) for i in obj]
def cameras_from_json(obj: Dict[str, Any]) -> Dict[str, pygeometry.Camera]:
"""
Read cameras from a json object
"""
cameras = {}
for key, value in obj.items():
cameras[key] = camera_from_json(key, value)
return cameras
def camera_to_json(camera) -> Dict[str, Any]:
"""
Write camera to a json object
"""
if camera.projection_type == "perspective":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal": camera.focal,
"k1": camera.k1,
"k2": camera.k2,
}
elif camera.projection_type == "brown":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
"k2": camera.k2,
"p1": camera.p1,
"p2": camera.p2,
"k3": camera.k3,
}
elif camera.projection_type == "fisheye":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal": camera.focal,
"k1": camera.k1,
"k2": camera.k2,
}
elif camera.projection_type == "fisheye_opencv":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
"k2": camera.k2,
"k3": camera.k3,
"k4": camera.k4,
}
elif camera.projection_type == "fisheye62":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
"k2": camera.k2,
"k3": camera.k3,
"k4": camera.k4,
"k5": camera.k5,
"k6": camera.k6,
"p1": camera.p1,
"p2": camera.p2,
}
elif camera.projection_type == "fisheye624":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
"k2": camera.k2,
"k3": camera.k3,
"k4": camera.k4,
"k5": camera.k5,
"k6": camera.k6,
"p1": camera.p1,
"p2": camera.p2,
"s0": camera.s0,
"s1": camera.s1,
"s2": camera.s2,
"s3": camera.s3,
}
elif camera.projection_type == "simple_radial":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
}
elif camera.projection_type == "radial":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal_x": camera.focal,
"focal_y": camera.focal * camera.aspect_ratio,
"c_x": camera.principal_point[0],
"c_y": camera.principal_point[1],
"k1": camera.k1,
"k2": camera.k2,
}
elif camera.projection_type == "dual":
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
"focal": camera.focal,
"k1": camera.k1,
"k2": camera.k2,
"transition": camera.transition,
}
elif pygeometry.Camera.is_panorama(camera.projection_type):
return {
"projection_type": camera.projection_type,
"width": camera.width,
"height": camera.height,
}
else:
raise NotImplementedError
def shot_to_json(shot: pymap.Shot) -> Dict[str, Any]:
"""
Write shot to a json object
"""
obj: Dict[str, Any] = {
"rotation": list(shot.pose.rotation),
"translation": list(shot.pose.translation),
"camera": shot.camera.id,
}
if shot.metadata is not None:
obj.update(pymap_metadata_to_json(shot.metadata))
if shot.mesh is not None:
obj["vertices"] = [list(vertice) for vertice in shot.mesh.vertices]
obj["faces"] = [list(face) for face in shot.mesh.faces]
if hasattr(shot, "scale"):
obj["scale"] = shot.scale
if hasattr(shot, "covariance"):
obj["covariance"] = shot.covariance.tolist()
if hasattr(shot, "merge_cc"):
obj["merge_cc"] = shot.merge_cc
return obj
def rig_instance_to_json(rig_instance: pymap.RigInstance) -> Dict[str, Any]:
"""
Write a rig instance to a json object
"""
return {
"translation": list(rig_instance.pose.translation),
"rotation": list(rig_instance.pose.rotation),
"rig_camera_ids": rig_instance.rig_camera_ids,
}
def rig_camera_to_json(rig_camera: pymap.RigCamera) -> Dict[str, Any]:
"""
Write a rig camera to a json object
"""
obj = {
"rotation": list(rig_camera.pose.rotation),
"translation": list(rig_camera.pose.translation),
}
return obj
def pymap_metadata_to_json(metadata: pymap.ShotMeasurements) -> Dict[str, Any]:
obj = {}
if metadata.orientation.has_value:
obj["orientation"] = metadata.orientation.value
if metadata.capture_time.has_value:
obj["capture_time"] = metadata.capture_time.value
if metadata.gps_accuracy.has_value:
obj["gps_dop"] = metadata.gps_accuracy.value
if metadata.gps_position.has_value:
obj["gps_position"] = list(metadata.gps_position.value)
if metadata.accelerometer.has_value:
obj["accelerometer"] = list(metadata.accelerometer.value)
if metadata.compass_angle.has_value and metadata.compass_accuracy.has_value:
obj["compass"] = {
"angle": metadata.compass_angle.value,
"accuracy": metadata.compass_accuracy.value,
}
else:
if metadata.compass_angle.has_value:
obj["compass"] = {"angle": metadata.compass_angle.value}
elif metadata.compass_accuracy.has_value:
obj["compass"] = {"accuracy": metadata.compass_accuracy.value}
if metadata.sequence_key.has_value:
obj["skey"] = metadata.sequence_key.value
return obj
def json_to_pymap_metadata(obj: Dict[str, Any]) -> pymap.ShotMeasurements:
metadata = pymap.ShotMeasurements()
if obj.get("orientation") is not None:
metadata.orientation.value = obj.get("orientation")
if obj.get("capture_time") is not None:
metadata.capture_time.value = obj.get("capture_time")
if obj.get("gps_dop") is not None:
metadata.gps_accuracy.value = obj.get("gps_dop")
if obj.get("gps_position") is not None:
metadata.gps_position.value = obj.get("gps_position")
if obj.get("skey") is not None:
metadata.sequence_key.value = obj.get("skey")
if obj.get("accelerometer") is not None:
metadata.accelerometer.value = obj.get("accelerometer")
if obj.get("compass") is not None:
compass = obj.get("compass")
if "angle" in compass:
metadata.compass_angle.value = compass["angle"]
if "accuracy" in compass:
metadata.compass_accuracy.value = compass["accuracy"]
return metadata
def point_to_json(point: pymap.Landmark) -> Dict[str, Any]:
"""
Write a point to a json object
"""
return {
"color": list(point.color.astype(float)),
"coordinates": list(point.coordinates),
}
def reconstruction_to_json(reconstruction: types.Reconstruction) -> Dict[str, Any]:
"""
Write a reconstruction to a json object
"""
obj = {"cameras": {}, "shots": {}, "points": {}, "biases": {}}
# Extract cameras
for camera in reconstruction.cameras.values():
obj["cameras"][camera.id] = camera_to_json(camera)
# Extract cameras biases
for camera_id, bias in reconstruction.biases.items():
obj["biases"][camera_id] = bias_to_json(bias)
# Extract rig models
if len(reconstruction.rig_cameras):
obj["rig_cameras"] = {}
for rig_camera in reconstruction.rig_cameras.values():
obj["rig_cameras"][rig_camera.id] = rig_camera_to_json(rig_camera)
if len(reconstruction.rig_instances):
obj["rig_instances"] = {}
for rig_instance in reconstruction.rig_instances.values():
obj["rig_instances"][rig_instance.id] = rig_instance_to_json(rig_instance)
# Extract shots
for shot in reconstruction.shots.values():
obj["shots"][shot.id] = shot_to_json(shot)
# Extract points
for point in reconstruction.points.values():
obj["points"][point.id] = point_to_json(point)
# Extract pano_shots
if hasattr(reconstruction, "pano_shots"):
if len(reconstruction.pano_shots) > 0:
obj["pano_shots"] = {}
for shot in reconstruction.pano_shots.values():
obj["pano_shots"][shot.id] = shot_to_json(shot)
# Extract reference topocentric frame
if reconstruction.reference:
ref = reconstruction.reference
obj["reference_lla"] = {
"latitude": ref.lat,
"longitude": ref.lon,
"altitude": ref.alt,
}
return obj
def reconstructions_to_json(
reconstructions: Iterable[types.Reconstruction],
) -> List[Dict[str, Any]]:
"""
Write all reconstructions to a json object
"""
return [reconstruction_to_json(i) for i in reconstructions]
def cameras_to_json(cameras: Dict[str, pygeometry.Camera]) -> Dict[str, Dict[str, Any]]:
"""
Write cameras to a json object
"""
obj = {}
for camera in cameras.values():
obj[camera.id] = camera_to_json(camera)
return obj
def bias_to_json(bias: pygeometry.Similarity) -> Dict[str, Any]:
return {
"rotation": list(bias.rotation),
"translation": list(bias.translation),
"scale": bias.scale,
}
def rig_cameras_to_json(
rig_cameras: Dict[str, pymap.RigCamera]
) -> Dict[str, Dict[str, Any]]:
"""
Write rig cameras to a json object
"""
obj = {}
for rig_camera in rig_cameras.values():
obj[rig_camera.id] = rig_camera_to_json(rig_camera)
return obj
def camera_from_vector(
camera_id: str,
width: int,
height: int,
projection_type: str,
parameters: List[float],
) -> pygeometry.Camera:
"""Build a camera from a serialized vector of parameters."""
if projection_type == "perspective":
focal, k1, k2 = parameters
camera = pygeometry.Camera.create_perspective(focal, k1, k2)
elif projection_type == "brown":
fx, fy, cx, cy, k1, k2, p1, p2, k3 = parameters
camera = pygeometry.Camera.create_brown(
fx, fy / fx, np.array([cx, cy]), np.array([k1, k2, k3, p1, p2])
)
elif projection_type == "fisheye":
focal, k1, k2 = parameters
camera = pygeometry.Camera.create_fisheye(focal, k1, k2)
elif projection_type == "fisheye_opencv":
fx, fy, cx, cy, k1, k2, k3, k4 = parameters
camera = pygeometry.Camera.create_fisheye_opencv(
fx, fy / fx, np.array([cx, cy]), np.array([k1, k2, k3, k4])
)
elif projection_type == "fisheye62":
fx, fy, cx, cy, k1, k2, k3, k4, k5, k6, p1, p2 = parameters
camera = pygeometry.Camera.create_fisheye62(
fx, fy / fx, np.array([cx, cy]), np.array([k1, k2, k3, k4, k5, k6, p1, p2])
)
elif projection_type == "fisheye624":
fx, fy, cx, cy, k1, k2, k3, k4, k5, k6, p1, p2, s0, s1, s2, s3 = parameters
camera = pygeometry.Camera.create_fisheye624(
fx,
fy / fx,
np.array([cx, cy]),
np.array([k1, k2, k3, k4, k5, k6, p1, p2, s0, s1, s2, s3]),
)
elif projection_type == "radial":
fx, fy, cx, cy, k1, k2 = parameters
camera = pygeometry.Camera.create_radial(
fx, fy / fx, np.array([cx, cy]), np.array([k1, k2])
)
elif projection_type == "simple_radial":
fx, fy, cx, cy, k1 = parameters
camera = pygeometry.Camera.create_simple_radial(
fx, fy / fx, np.array([cx, cy]), k1
)
elif projection_type == "dual":
focal, k1, k2, transition = parameters
camera = pygeometry.Camera.create_dual(transition, focal, k1, k2)
elif pygeometry.Camera.is_panorama(projection_type):
camera = pygeometry.Camera.create_spherical()
else:
raise NotImplementedError
camera.id = camera_id
camera.width = width
camera.height = height
return camera
def camera_to_vector(camera: pygeometry.Camera) -> List[float]:
"""Serialize camera parameters to a vector of floats."""
if camera.projection_type == "perspective":
parameters = [camera.focal, camera.k1, camera.k2]
elif camera.projection_type == "brown":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
camera.k2,
camera.p1,
camera.p2,
camera.k3,
]
elif camera.projection_type == "fisheye":
parameters = [camera.focal, camera.k1, camera.k2]
elif camera.projection_type == "fisheye_opencv":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
camera.k2,
camera.k3,
camera.k4,
]
elif camera.projection_type == "fisheye62":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
camera.k2,
camera.k3,
camera.k4,
camera.k5,
camera.k6,
camera.p1,
camera.p2,
]
elif camera.projection_type == "fisheye624":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
camera.k2,
camera.k3,
camera.k4,
camera.k5,
camera.k6,
camera.p1,
camera.p2,
camera.s0,
camera.s1,
camera.s2,
camera.s3,
]
elif camera.projection_type == "radial":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
camera.k2,
]
elif camera.projection_type == "simple_radial":
parameters = [
camera.focal,
camera.focal * camera.aspect_ratio,
camera.principal_point[0],
camera.principal_point[1],
camera.k1,
]
elif camera.projection_type == "dual":
parameters = [
camera.focal,
camera.k1,
camera.k2,
camera.transition,
]
elif pygeometry.Camera.is_panorama(camera.projection_type):
parameters = []
else:
raise NotImplementedError
return parameters
def _read_gcp_list_lines(
lines: Iterable[str],
projection,
exifs: Dict[str, Dict[str, Any]],
) -> List[pymap.GroundControlPoint]:
points = {}
for line in lines:
words = line.split(None, 5)
easting, northing, alt, pixel_x, pixel_y = map(float, words[:5])
key = (easting, northing, alt)
shot_tokens = words[5].split(None)
shot_id = shot_tokens[0]
if shot_id not in exifs:
continue
if key in points:
point = points[key]
else:
# Convert 3D coordinates
if np.isnan(alt):
alt = 0
has_altitude = False
else:
has_altitude = True
if projection is not None:
lat, lon = projection.transform(easting, northing)
else:
lon, lat = easting, northing
point = pymap.GroundControlPoint()
point.id = "unnamed-%d" % len(points)
point.lla = {"latitude": lat, "longitude": lon, "altitude": alt}
point.has_altitude = has_altitude
points[key] = point
# Convert 2D coordinates
d = exifs[shot_id]
coordinates = features.normalized_image_coordinates(
np.array([[pixel_x, pixel_y]]), d["width"], d["height"]
)[0]
o = pymap.GroundControlPointObservation()
o.shot_id = shot_id
o.projection = coordinates
point.add_observation(o)
return list(points.values())
def _parse_utm_projection_string(line: str) -> str:
"""Convert strings like 'WGS84 UTM 32N' to a proj4 definition."""
words = line.lower().split()
assert len(words) == 3
zone = line.split()[2].upper()
if zone[-1] == "N":
zone_number = int(zone[:-1])
zone_hemisphere = "north"
elif zone[-1] == "S":
zone_number = int(zone[:-1])
zone_hemisphere = "south"
else:
zone_number = int(zone)
zone_hemisphere = "north"
s = "+proj=utm +zone={} +{} +ellps=WGS84 +datum=WGS84 +units=m +no_defs"
return s.format(zone_number, zone_hemisphere)
def _parse_projection(line: str):
"""Build a proj4 from the GCP format line."""
crs_4326 = pyproj.CRS.from_epsg(4326)
if line.strip() == "WGS84":
return None
elif line.upper().startswith("WGS84 UTM"):
return pyproj.Transformer.from_proj(pyproj.CRS(_parse_utm_projection_string(line)), crs_4326)
elif "+proj" in line:
return pyproj.Transformer.from_proj(pyproj.CRS(line), crs_4326)
elif line.upper().startswith("EPSG:"):
return pyproj.Transformer.from_proj(pyproj.CRS.from_epsg(int(line.split(":")[1])), crs_4326)
else:
raise ValueError("Un-supported geo system definition: {}".format(line))
def _valid_gcp_line(line: str) -> bool:
stripped = line.strip()
return stripped != "" and stripped[0] != "#"
def read_gcp_list(fileobj, exif: Dict[str, Any]) -> List[pymap.GroundControlPoint]:
"""Read a ground control points from a gcp_list.txt file.
It requires the points to be in the WGS84 lat, lon, alt format.
If reference is None, topocentric data won't be initialized.
"""
all_lines = fileobj.readlines()
lines = iter(filter(_valid_gcp_line, all_lines))
projection = _parse_projection(next(lines))
points = _read_gcp_list_lines(lines, projection, exif)
return points
def read_ground_control_points(fileobj: IO) -> List[pymap.GroundControlPoint]:
"""Read ground control points from json file"""
obj = json_load(fileobj)
points = []
for point_dict in obj["points"]:
point = pymap.GroundControlPoint()
point.id = point_dict["id"]
lla = point_dict.get("position")
if lla:
point.lla = lla
point.has_altitude = "altitude" in point.lla
observations = []
observing_images = set()
for o_dict in point_dict["observations"]:
o = pymap.GroundControlPointObservation()
o.shot_id = o_dict["shot_id"]
if o.shot_id in observing_images:
logger.warning(
"GCP {} has multiple observations in image {}".format(
point.id, o.shot_id
)
)
observing_images.add(o.shot_id)
if "projection" in o_dict:
o.projection = np.array(o_dict["projection"])
observations.append(o)
point.observations = observations
points.append(point)
return points
def write_ground_control_points(
gcp: List[pymap.GroundControlPoint],
fileobj: IO,
) -> None:
"""Write ground control points to json file."""
obj = {"points": []}
for point in gcp:
point_obj = {}
point_obj["id"] = point.id
if point.lla:
point_obj["position"] = {
"latitude": point.lla["latitude"],
"longitude": point.lla["longitude"],
}
if point.has_altitude:
point_obj["position"]["altitude"] = point.lla["altitude"]
point_obj["observations"] = []
for observation in point.observations:
point_obj["observations"].append(
{
"shot_id": observation.shot_id,
"projection": tuple(observation.projection),
}
)
obj["points"].append(point_obj)
json_dump(obj, fileobj)
def json_dump_kwargs(minify: bool = False) -> Dict[str, Any]:
if minify:
indent, separators = None, (",", ":")
else:
indent, separators = 4, None
return {"indent": indent, "ensure_ascii": False, "separators": separators}
def json_dump(data, fout, minify=False):
kwargs = json_dump_kwargs(minify)
return json.dump(data, fout, **kwargs)
def json_dumps(data, minify=False):
kwargs = json_dump_kwargs(minify)
return json.dumps(data, **kwargs)
def json_load(fp):
return json.load(fp)
def json_loads(text):
return json.loads(text)
# PLY
def ply_header(
count_vertices: int, with_normals: bool = False, point_num_views: bool = False
) -> List[str]:
if with_normals:
header = [
"ply",
"format ascii 1.0",
"element vertex {}".format(count_vertices),
"property float x",
"property float y",
"property float z",
"property float nx",
"property float ny",
"property float nz",
"property uchar diffuse_red",
"property uchar diffuse_green",
"property uchar diffuse_blue",
]
else:
header = [
"ply",
"format ascii 1.0",
"element vertex {}".format(count_vertices),
"property float x",
"property float y",
"property float z",
"property uchar diffuse_red",
"property uchar diffuse_green",
"property uchar diffuse_blue",
]
if point_num_views:
header += ["property uchar views"]
header += ["end_header"]
return header
def points_to_ply_string(vertices: List[str], point_num_views: bool = False):
header = ply_header(len(vertices), point_num_views=point_num_views)
return "\n".join(header + vertices + [""])
def reconstruction_to_ply(
reconstruction: types.Reconstruction,
tracks_manager: Optional[pymap.TracksManager] = None,
no_cameras: bool = False,
no_points: bool = False,
point_num_views: bool = False,
):
"""Export reconstruction points as a PLY string."""
vertices = []
if not no_points:
for point in reconstruction.points.values():
p, c = point.coordinates, point.color
s = "{} {} {} {} {} {}".format(
p[0], p[1], p[2], int(c[0]), int(c[1]), int(c[2])
)
if point_num_views and tracks_manager:
obs_count = point.number_of_observations()
if obs_count == 0:
obs_count = len(tracks_manager.get_track_observations(point.id))
s += " {}".format(obs_count)
vertices.append(s)
if not no_cameras:
for shot in reconstruction.shots.values():
o = shot.pose.get_origin()
R = shot.pose.get_rotation_matrix()
for axis in range(3):
c = 255 * np.eye(3)[axis]
for depth in np.linspace(0, 2, 10):
p = o + depth * R[axis]
s = "{} {} {} {} {} {}".format(
p[0], p[1], p[2], int(c[0]), int(c[1]), int(c[2])
)
if point_num_views:
s += " 0"
vertices.append(s)
return points_to_ply_string(vertices, point_num_views)
def point_cloud_from_ply(
fp: TextIO,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Load point cloud from a PLY file."""
all_lines = fp.read().splitlines()
start = all_lines.index("end_header") + 1
lines = all_lines[start:]
n = len(lines)
points = np.zeros((n, 3), dtype=np.float32)
normals = np.zeros((n, 3), dtype=np.float32)
colors = np.zeros((n, 3), dtype=np.uint8)
labels = np.zeros((n,), dtype=np.uint8)
for i, row in enumerate(lines):
words = row.split()
label = int(words[9])
points[i] = list(map(float, words[0:3]))
normals[i] = list(map(float, words[3:6]))
colors[i] = list(map(int, words[6:9]))
labels[i] = label
return points, normals, colors, labels
def point_cloud_to_ply(
points: np.ndarray,
normals: np.ndarray,
colors: np.ndarray,
labels: np.ndarray,
fp: TextIO,
):
fp.write("ply\n")
fp.write("format ascii 1.0\n")
fp.write("element vertex {}\n".format(len(points)))
fp.write("property float x\n")
fp.write("property float y\n")
fp.write("property float z\n")
fp.write("property float nx\n")
fp.write("property float ny\n")
fp.write("property float nz\n")
fp.write("property uchar diffuse_red\n")
fp.write("property uchar diffuse_green\n")
fp.write("property uchar diffuse_blue\n")
fp.write("property uchar class\n")
fp.write("end_header\n")
template = "{:.4f} {:.4f} {:.4f} {:.3f} {:.3f} {:.3f} {} {} {} {}\n"
for i in range(len(points)):
p, n, c, l = points[i], normals[i], colors[i], labels[i]
fp.write(
template.format(
p[0],
p[1],
p[2],
n[0],
n[1],
n[2],
int(c[0]),
int(c[1]),
int(c[2]),
int(l),
)
)
# Filesystem interaction methods
def mkdir_p(path: str) -> None:
"""Make a directory including parent directories."""
os.makedirs(path, exist_ok=True)
def open_wt(path: str) -> IO[Any]:
"""Open a file in text mode for writing utf-8."""
return open(path, "w", encoding="utf-8")
def open_rt(path: str) -> IO[Any]:
"""Open a file in text mode for reading utf-8."""
return open(path, "r", encoding="utf-8")
def imread(
path: str, grayscale: bool = False, unchanged: bool = False, anydepth: bool = False
):
with open(path, "rb") as fb:
return imread_from_fileobject(fb, grayscale, unchanged, anydepth)
def imread_from_fileobject(
fb, grayscale: bool = False, unchanged: bool = False, anydepth: bool = False
) -> np.ndarray:
"""Load image as an array ignoring EXIF orientation."""
if context.OPENCV3:
if grayscale:
flags = cv2.IMREAD_GRAYSCALE
elif unchanged:
flags = cv2.IMREAD_UNCHANGED
else:
flags = cv2.IMREAD_COLOR
try:
flags |= cv2.IMREAD_IGNORE_ORIENTATION
except AttributeError:
logger.warning(
"OpenCV version {} does not support loading images without "
"rotating them according to EXIF. Please upgrade OpenCV to "
"version 3.2 or newer.".format(cv2.__version__)
)
if anydepth:
flags |= cv2.IMREAD_ANYDEPTH
else:
if grayscale:
flags = cv2.CV_LOAD_IMAGE_GRAYSCALE
elif unchanged:
flags = cv2.CV_LOAD_IMAGE_UNCHANGED
else:
flags = cv2.CV_LOAD_IMAGE_COLOR
if anydepth:
flags |= cv2.CV_LOAD_IMAGE_ANYDEPTH
im_buffer = np.asarray(bytearray(fb.read()), dtype=np.uint8)
image = cv2.imdecode(im_buffer, flags)
if image is None:
raise IOError("Unable to load image")
if len(image.shape) == 3:
image[:, :, :3] = image[:, :, [2, 1, 0]] # Turn BGR to RGB (or BGRA to RGBA)
return image
@classmethod
def imwrite(cls, path: str, image: np.ndarray) -> None:
with cls.open(path, "wb") as fwb:
imwrite(fwb, image, path)
def imwrite(path: str, image: np.ndarray):
with open(path, "wb") as fwb:
return imwrite_from_fileobject(fwb, image, path)
def imwrite_from_fileobject(fwb, image: np.ndarray, ext: str) -> None:
"""Write an image to a file object"""
if len(image.shape) == 3:
image[:, :, :3] = image[:, :, [2, 1, 0]] # Turn RGB to BGR (or RGBA to BGRA)
_, im_buffer = cv2.imencode(ext, image)
fwb.write(im_buffer)
def image_size_from_fileobject(fb):
"""Height and width of an image."""
try:
with Image.open(fb) as img:
width, height = img.size
return height, width
except Exception:
# Slower fallback
image = imread(fb.name)
return image.shape[:2]
def image_size(path: str) -> Tuple[int, int]:
"""Height and width of an image."""
with open(path, "rb") as fb:
return image_size_from_fileobject(fb)
# IO Filesystem
class IoFilesystemBase(ABC):
@classmethod
@abstractmethod
def exists(cls, path: str):
pass
@classmethod
def ls(cls, path: str):
pass
@classmethod
@abstractmethod
def isfile(cls, path: str):
pass
@classmethod
@abstractmethod
def isdir(cls, path: str):
pass
@classmethod
def rm_if_exist(cls, filename: str):
pass
@classmethod
def symlink(cls, src_path: str, dst_path: str, **kwargs):
pass
@classmethod
@abstractmethod
def open(cls, *args, **kwargs):
pass
@classmethod
@abstractmethod
def open_wt(cls, path: str):
pass
@classmethod
@abstractmethod
def open_rt(cls, path: str):
pass
@classmethod
@abstractmethod
def mkdir_p(cls, path: str):
pass
@classmethod
@abstractmethod
def imwrite(cls, path: str, image):
pass
@classmethod
@abstractmethod
def imread(cls, path: str, grayscale=False, unchanged=False, anydepth=False):
pass
@classmethod
@abstractmethod
def image_size(cls, path: str):
pass
@classmethod
@abstractmethod
def timestamp(cls, path: str):
pass
class IoFilesystemDefault(IoFilesystemBase):
def __init__(self):
self.type = "default"
@classmethod
def exists(cls, path: str) -> str:
return os.path.exists(path)
@classmethod
def ls(cls, path: str) -> List[str]:
return os.listdir(path)
@classmethod
def isfile(cls, path: str) -> str:
return os.path.isfile(path)
@classmethod
def isdir(cls, path: str) -> str:
return os.path.isdir(path)
@classmethod
def rm_if_exist(cls, filename: str) -> None:
if os.path.islink(filename):
os.unlink(filename)
if os.path.exists(filename):
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
@classmethod
def symlink(cls, src_path: str, dst_path: str, **kwargs):
os.symlink(src_path, dst_path, **kwargs)
@classmethod
def open(cls, *args, **kwargs):
return open(*args, **kwargs)
@classmethod
def open_wt(cls, path: str):
return cls.open(path, "w", encoding="utf-8")
@classmethod
def open_rt(cls, path: str):
return cls.open(path, "r", encoding="utf-8")
@classmethod
def mkdir_p(cls, path: str):
return os.makedirs(path, exist_ok=True)
@classmethod
def imread(
cls,
path: str,
grayscale: bool = False,
unchanged: bool = False,
anydepth: bool = False,
):
with cls.open(path, "rb") as fb:
return imread_from_fileobject(fb, grayscale, unchanged, anydepth)
@classmethod
def imwrite(cls, path: str, image) -> None:
with cls.open(path, "wb") as fwb:
imwrite_from_fileobject(fwb, image, path)
@classmethod
def image_size(cls, path: str) -> Tuple[int, int]:
with cls.open(path, "rb") as fb:
return image_size_from_fileobject(fb)
@classmethod
def timestamp(cls, path: str) -> str:
return os.path.getmtime(path)