opensfm/report.py (424 lines of code) (raw):
import logging
import os
import subprocess
import tempfile
import PIL
from fpdf import FPDF
from opensfm import io
from opensfm.dataset import DataSet
from typing import Any, Dict
logger = logging.getLogger(__name__)
class Report:
def __init__(self, data: DataSet) -> None:
self.output_path = os.path.join(data.data_path, "stats")
self.dataset_name = os.path.basename(data.data_path)
self.io_handler = data.io_handler
self.mapi_light_light_green = [210, 245, 226]
self.mapi_light_green = [5, 203, 99]
self.mapi_light_grey = [218, 222, 228]
self.mapi_dark_grey = [99, 115, 129]
self.pdf = FPDF("P", "mm", "A4")
self.pdf.add_page()
self.title_size = 20
self.h1 = 16
self.h2 = 13
self.h3 = 10
self.text = 10
self.small_text = 8
self.margin = 10
self.cell_height = 7
self.total_size = 190
self.stats = self._read_stats_file("stats.json")
def save_report(self, filename: str) -> None:
bytestring = self.pdf.output(dest="S")
if isinstance(bytestring, str):
bytestring = bytestring.encode("utf8")
with self.io_handler.open(
os.path.join(self.output_path, filename), "wb"
) as fwb:
fwb.write(bytestring)
def _make_table(self, columns_names, rows, row_header=False) -> None:
self.pdf.set_font("Helvetica", "", self.h3)
self.pdf.set_line_width(0.3)
columns_sizes = [int(self.total_size / len(rows[0]))] * len(rows[0])
if columns_names:
self.pdf.set_draw_color(*self.mapi_light_grey)
self.pdf.set_fill_color(*self.mapi_light_grey)
for col, size in zip(columns_names, columns_sizes):
self.pdf.rect(
self.pdf.get_x(),
self.pdf.get_y(),
size,
self.cell_height,
style="FD",
)
self.pdf.set_text_color(*self.mapi_dark_grey)
self.pdf.cell(size, self.cell_height, col, align="L")
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.cell_height)
self.pdf.set_draw_color(*self.mapi_light_grey)
self.pdf.set_fill_color(*self.mapi_light_light_green)
for row in rows:
for i, (col, size) in enumerate(zip(row, columns_sizes)):
if i == 0 and row_header:
self.pdf.set_draw_color(*self.mapi_light_grey)
self.pdf.set_fill_color(*self.mapi_light_grey)
self.pdf.rect(
self.pdf.get_x(),
self.pdf.get_y(),
size,
self.cell_height,
style="FD",
)
self.pdf.set_text_color(*self.mapi_dark_grey)
if i == 0 and row_header:
self.pdf.set_draw_color(*self.mapi_light_grey)
self.pdf.set_fill_color(*self.mapi_light_light_green)
self.pdf.cell(size, self.cell_height, col, align="L")
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.cell_height)
def _read_stats_file(self, filename) -> Dict[str, Any]:
file_path = os.path.join(self.output_path, filename)
with self.io_handler.open_rt(file_path) as fin:
return io.json_load(fin)
def _make_section(self, title: str) -> None:
self.pdf.set_font("Helvetica", "B", self.h1)
self.pdf.set_text_color(*self.mapi_dark_grey)
self.pdf.cell(0, self.margin, title, align="L")
self.pdf.set_xy(self.margin, self.pdf.get_y() + 1.5 * self.margin)
def _make_subsection(self, title: str) -> None:
self.pdf.set_xy(self.margin, self.pdf.get_y() - 0.5 * self.margin)
self.pdf.set_font("Helvetica", "B", self.h2)
self.pdf.set_text_color(*self.mapi_dark_grey)
self.pdf.cell(0, self.margin, title, align="L")
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def _make_centered_image(self, image_path: str, desired_height: float) -> None:
with tempfile.TemporaryDirectory() as tmp_local_dir:
local_image_path = os.path.join(tmp_local_dir, os.path.basename(image_path))
with self.io_handler.open(local_image_path, "wb") as fwb:
with self.io_handler.open(image_path, "rb") as f:
fwb.write(f.read())
width, height = PIL.Image.open(local_image_path).size
resized_width = width * desired_height / height
if resized_width > self.total_size:
resized_width = self.total_size
desired_height = height * resized_width / width
self.pdf.image(
local_image_path,
self.pdf.get_x() + self.total_size / 2 - resized_width / 2,
self.pdf.get_y(),
h=desired_height,
)
self.pdf.set_xy(
self.margin, self.pdf.get_y() + desired_height + self.margin
)
def make_title(self) -> None:
# title
self.pdf.set_font("Helvetica", "B", self.title_size)
self.pdf.set_text_color(*self.mapi_light_green)
self.pdf.cell(0, self.margin, "OpenSfM Quality Report", align="C")
self.pdf.set_xy(self.margin, self.title_size)
# version number
try:
out, _ = subprocess.Popen(
["git", "describe", "--tags"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
).communicate()
version = out.strip().decode()
except BaseException as e:
logger.warning(f"Exception thrwon while extracting 'git' version, {e}")
version = ""
# indicate we don't know the version
version = "unknown" if version == "" else version
self.pdf.set_font("Helvetica", "", self.small_text)
self.pdf.set_text_color(*self.mapi_dark_grey)
self.pdf.cell(
0, self.margin, f"Processed with OpenSfM version {version}", align="R"
)
self.pdf.set_xy(self.margin, self.pdf.get_y() + 2 * self.margin)
def make_dataset_summary(self) -> None:
self._make_section("Dataset Summary")
rows = [
["Dataset", self.dataset_name],
["Date", self.stats["processing_statistics"]["date"]],
[
"Area Covered",
f"{self.stats['processing_statistics']['area']/1e6:.6f} km²",
],
[
"Processing Time",
f"{self.stats['processing_statistics']['steps_times']['Total Time']:.2f} seconds",
],
]
self._make_table(None, rows, True)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def _has_meaningful_gcp(self) -> bool:
return (
self.stats["reconstruction_statistics"]["has_gcp"]
and "average_error" in self.stats["gcp_errors"]
)
def make_processing_summary(self) -> None:
self._make_section("Processing Summary")
rec_shots, init_shots = (
self.stats["reconstruction_statistics"]["reconstructed_shots_count"],
self.stats["reconstruction_statistics"]["initial_shots_count"],
)
rec_points, init_points = (
self.stats["reconstruction_statistics"]["reconstructed_points_count"],
self.stats["reconstruction_statistics"]["initial_points_count"],
)
geo_string = []
if self.stats["reconstruction_statistics"]["has_gps"]:
geo_string.append("GPS")
if self._has_meaningful_gcp():
geo_string.append("GCP")
ratio_shots = rec_shots / init_shots * 100 if init_shots > 0 else -1
rows = [
[
"Reconstructed Images",
f"{rec_shots} over {init_shots} shots ({ratio_shots:.1f}%)",
],
[
"Reconstructed Points",
f"{rec_points} over {init_points} points ({rec_points/init_points*100:.1f}%)",
],
[
"Reconstructed Components",
f"{self.stats['reconstruction_statistics']['components']} component",
],
[
"Detected Features",
f"{self.stats['features_statistics']['detected_features']['median']} features",
],
[
"Reconstructed Features",
f"{self.stats['features_statistics']['reconstructed_features']['median']} features",
],
["Geographic Reference", " and ".join(geo_string)],
]
row_gps_gcp = [" / ".join(geo_string) + " errors"]
geo_errors = []
if self.stats["reconstruction_statistics"]["has_gps"]:
geo_errors.append(f"{self.stats['gps_errors']['average_error']:.2f}")
if self._has_meaningful_gcp():
geo_errors.append(f"{self.stats['gcp_errors']['average_error']:.2f}")
row_gps_gcp.append(" / ".join(geo_errors) + " meters")
rows.append(row_gps_gcp)
self._make_table(None, rows, True)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 2)
topview_height = 130
topview_grids = [
f for f in self.io_handler.ls(self.output_path) if f.startswith("topview")
]
self._make_centered_image(
os.path.join(self.output_path, topview_grids[0]), topview_height
)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def make_processing_time_details(self) -> None:
self._make_section("Processing Time Details")
columns_names = list(self.stats["processing_statistics"]["steps_times"].keys())
formatted_floats = []
for v in self.stats["processing_statistics"]["steps_times"].values():
formatted_floats.append(f"{v:.2f} sec.")
rows = [formatted_floats]
self._make_table(columns_names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + 2 * self.margin)
def make_gps_details(self) -> None:
self._make_section("GPS/GCP Errors Details")
# GPS
for error_type in ["gps", "gcp"]:
rows = []
columns_names = [error_type.upper(), "Mean", "Sigma", "RMS Error"]
if "average_error" not in self.stats[error_type + "_errors"]:
continue
for comp in ["x", "y", "z"]:
row = [comp.upper() + " Error (meters)"]
row.append(f"{self.stats[error_type + '_errors']['mean'][comp]:.3f}")
row.append(f"{self.stats[error_type +'_errors']['std'][comp]:.3f}")
row.append(f"{self.stats[error_type +'_errors']['error'][comp]:.3f}")
rows.append(row)
rows.append(
[
"Total",
"",
"",
f"{self.stats[error_type +'_errors']['average_error']:.3f}",
]
)
self._make_table(columns_names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 2)
rows = []
columns_names = [
"GPS Bias",
"Scale",
"Translation",
"Rotation",
]
for camera, params in self.stats["camera_errors"].items():
bias = params["bias"]
s, t, R = bias["scale"], bias["translation"], bias["rotation"]
rows.append(
[
camera,
f"{s:.2f}",
f"{t[0]:.2f} {t[1]:.2f} {t[2]:.2f}",
f"{R[0]:.2f} {R[1]:.2f} {R[2]:.2f}",
]
)
self._make_table(columns_names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 2)
def make_features_details(self) -> None:
self._make_section("Features Details")
heatmap_height = 60
heatmaps = [
f for f in self.io_handler.ls(self.output_path) if f.startswith("heatmap")
]
self._make_centered_image(
os.path.join(self.output_path, heatmaps[0]), heatmap_height
)
if len(heatmaps) > 1:
logger.warning("Please implement multi-model display")
columns_names = ["", "Min.", "Max.", "Mean", "Median"]
rows = []
for comp in ["detected_features", "reconstructed_features"]:
row = [comp.replace("_", " ").replace("features", "").capitalize()]
for t in columns_names[1:]:
row.append(
f"{self.stats['features_statistics'][comp][t.replace('.', '').lower()]:.0f}"
)
rows.append(row)
self._make_table(columns_names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def make_reconstruction_details(self) -> None:
self._make_section("Reconstruction Details")
rows = [
[
"Average Reprojection Error (normalized / pixels / angular)",
(
f"{self.stats['reconstruction_statistics']['reprojection_error_normalized']:.2f} / "
f"{self.stats['reconstruction_statistics']['reprojection_error_pixels']:.2f} / "
f"{self.stats['reconstruction_statistics']['reprojection_error_angular']:.5f}"
),
],
[
"Average Track Length",
f"{self.stats['reconstruction_statistics']['average_track_length']:.2f} images",
],
[
"Average Track Length (> 2)",
f"{self.stats['reconstruction_statistics']['average_track_length_over_two']:.2f} images",
],
]
self._make_table(None, rows, True)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 1.5)
residual_histogram_height = 60
residual_histogram = [
f
for f in self.io_handler.ls(self.output_path)
if f.startswith("residual_histogram")
]
self._make_centered_image(
os.path.join(self.output_path, residual_histogram[0]),
residual_histogram_height,
)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def make_camera_models_details(self) -> None:
self._make_section("Camera Models Details")
for camera, params in self.stats["camera_errors"].items():
residual_grids = [
f
for f in self.io_handler.ls(self.output_path)
if f.startswith("residuals_" + str(camera.replace("/", "_")))
]
if not residual_grids:
continue
initial = params["initial_values"]
optimized = params["optimized_values"]
names = [""] + list(initial.keys())
rows = []
rows.append(["Initial"] + [f"{x:.4f}" for x in initial.values()])
rows.append(["Optimized"] + [f"{x:.4f}" for x in optimized.values()])
self._make_subsection(camera)
self._make_table(names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 2)
residual_grid_height = 100
self._make_centered_image(
os.path.join(self.output_path, residual_grids[0]), residual_grid_height
)
def make_rig_cameras_details(self) -> None:
if len(self.stats["rig_errors"]) == 0:
return
self._make_section("Rig Cameras Details")
columns_names = [
"Translation X",
"Translation Y",
"Translation Z",
"Rotation X",
"Rotation Y",
"Rotation Z",
]
for rig_camera_id, params in self.stats["rig_errors"].items():
initial = params["initial_values"]
optimized = params["optimized_values"]
rows = []
r_init, t_init = initial["rotation"], initial["translation"]
r_opt, t_opt = optimized["rotation"], optimized["translation"]
rows.append(
[
f"{t_init[0]:.4f} m",
f"{t_init[1]:.4f} m",
f"{t_init[2]:.4f} m",
f"{r_init[0]:.4f}",
f"{r_init[1]:.4f}",
f"{r_init[2]:.4f}",
]
)
rows.append(
[
f"{t_opt[0]:.4f} m",
f"{t_opt[1]:.4f} m",
f"{t_opt[2]:.4f} m",
f"{r_opt[0]:.4f}",
f"{r_opt[1]:.4f}",
f"{r_opt[2]:.4f}",
]
)
self._make_subsection(rig_camera_id)
self._make_table(columns_names, rows)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin / 2)
def make_tracks_details(self) -> None:
self._make_section("Tracks Details")
matchgraph_height = 80
matchgraph = [
f
for f in self.io_handler.ls(self.output_path)
if f.startswith("matchgraph")
]
self._make_centered_image(
os.path.join(self.output_path, matchgraph[0]), matchgraph_height
)
histogram = self.stats["reconstruction_statistics"]["histogram_track_length"]
start_length, end_length = 2, 10
row_length = ["Length"]
for length, _ in sorted(histogram.items(), key=lambda x: int(x[0])):
if int(length) < start_length or int(length) > end_length:
continue
row_length.append(length)
row_count = ["Count"]
for length, count in sorted(histogram.items(), key=lambda x: int(x[0])):
if int(length) < start_length or int(length) > end_length:
continue
row_count.append(f"{count}")
self._make_table(None, [row_length, row_count], True)
self.pdf.set_xy(self.margin, self.pdf.get_y() + self.margin)
def add_page_break(self) -> None:
self.pdf.add_page("P")
def generate_report(self) -> None:
self.make_title()
self.make_dataset_summary()
self.make_processing_summary()
self.add_page_break()
self.make_features_details()
self.make_reconstruction_details()
self.add_page_break()
self.make_tracks_details()
self.make_camera_models_details()
self.make_rig_cameras_details()
self.add_page_break()
self.make_gps_details()
self.make_processing_time_details()