annotation_gui_gcp/lib/GUI.py (199 lines of code) (raw):
import os
import random
import subprocess
import sys
import time
from collections import defaultdict
import flask
from annotation_gui_gcp.lib.views.cad_view import CADView
from annotation_gui_gcp.lib.views.cp_finder_view import ControlPointFinderView
from annotation_gui_gcp.lib.views.image_view import ImageView
from annotation_gui_gcp.lib.views.tools_view import ToolsView
from opensfm import dataset
class Gui:
def __init__(
self,
app,
gcp_manager,
image_manager,
rig_groups=None,
cad_paths=(),
):
self.gcp_manager = gcp_manager
self.image_manager = image_manager
self.curr_point = None
self.shot_std = {}
self.rig_groups = rig_groups if rig_groups else {}
self.path = self.gcp_manager.path
self.app = app
self.ix_a = 0
self.ix_b = 1
self.reconstruction_options = self.get_reconstruction_options()
self.create_ui(cad_paths)
self.load_gcps()
self.load_analysis_results(self.ix_a, self.ix_b)
def get_reconstruction_options(self):
p_recs = self.path + "/reconstruction.json" if self.path else None
if p_recs is None or not os.path.exists(p_recs):
return ["NONE", "NONE"]
data = dataset.DataSet(self.path)
recs = data.load_reconstruction()
options = []
for ix, rec in enumerate(recs):
camcount = defaultdict(int)
for shot in rec.shots.values():
camcount[shot.camera.id] += 1
str_repr = f"REC#{ix}: " + ", ".join(
f"{k}({v})" for k, v in camcount.items()
)
options.append(str_repr)
options.append("None (3d-to-2d)")
return options
def sync_to_client(self) -> None:
for view in self.sequence_views + self.cad_views + [self.tools_view]:
view.sync_to_client()
def create_ui(self, cad_paths):
subpane_routes = []
has_views_that_need_tracking = len(cad_paths) > 0
self.tools_view = ToolsView(self, self.app)
self.sequence_views = []
for ix, image_keys in enumerate(self.image_manager.seqs.values()):
v = ImageView(
self,
self.app,
f"/sequence_view_{ix+1}",
image_keys,
has_views_that_need_tracking,
)
self.sequence_views.append(v)
cp_view = ControlPointFinderView(self, self.app)
self.sequence_views.append(cp_view)
self.cad_views = []
for ix, cad_path in enumerate(cad_paths):
v = CADView(self, self.app, f"/cad_view_{ix+1}", cad_path)
self.cad_views.append(v)
subpane_routes = [
v.route_prefix for v in (self.sequence_views + self.cad_views)
]
@self.app.route("/")
def send_main_page():
return flask.render_template("mosaic.html", subpane_routes=subpane_routes)
def analyze_rigid(self):
self.analyze(rigid=True, covariance=False)
def analyze_flex(self):
self.analyze(rigid=False, covariance=False)
def analyze(self, rigid=False, covariance=True):
t = time.time() - os.path.getmtime(self.path + "/ground_control_points.json")
# ix_a = self.reconstruction_options.index(self.rec_a.get())
# ix_b = self.reconstruction_options.index(self.rec_b.get())
ix_a = self.ix_a
ix_b = self.ix_b
if t > 30:
print(
"Please save to ground_control_points.json before running the analysis"
)
return
args = [
sys.executable,
os.path.join(os.path.dirname(os.path.dirname(__file__)), "run_ba.py"),
self.path,
"--rec_a",
str(ix_a),
]
if ix_b < len(self.reconstruction_options) - 1:
args.extend(("--rec_b", str(ix_b)))
else:
ix_b = None
if rigid:
args.extend(["--rigid"])
if covariance:
args.extend(["--covariance"])
# Call the run_ba script
subprocess.run(args)
# Load the results
self.load_analysis_results(ix_a, ix_b)
for view in self.sequence_views:
view.populate_image_list()
print("Done analyzing")
def load_analysis_results(self, ix_a, ix_b):
self.load_shot_std(f"{self.path}/shots_std_{ix_a}x{ix_b}.csv")
p_gcp_errors = f"{self.path}/gcp_reprojections_{ix_a}x{ix_b}.json"
self.gcp_manager.load_gcp_reprojections(p_gcp_errors)
def load_shot_std(self, path):
self.shot_std = {}
if os.path.isfile(path):
with open(path, "r") as f:
for line in f:
shot, std = line[:-1].split(",")
self.shot_std[shot] = float(std)
def load_gcps(self, filename=None):
self.gcp_manager.load_from_file(filename)
for view in self.sequence_views + self.cad_views:
view.display_points()
view.populate_image_list()
self.populate_gcp_list()
def add_gcp(self):
new_gcp = self.gcp_manager.add_point()
self.populate_gcp_list()
self.update_active_gcp(new_gcp)
def toggle_sticky_zoom(self):
if self.sticky_zoom.get():
self.sticky_zoom.set(False)
else:
self.sticky_zoom.set(True)
def populate_gcp_list(self):
pass
def remove_gcp(self):
to_be_removed_point = self.curr_point
if not to_be_removed_point:
return
self.gcp_manager.remove_gcp(to_be_removed_point)
self.populate_gcp_list()
self.update_active_gcp(None)
def update_active_gcp(self, new_active_gcp):
print("Active GCP is now", new_active_gcp)
self.curr_point = new_active_gcp
for view in self.sequence_views + self.cad_views:
view.display_points()
if self.curr_point:
view.highlight_gcp_reprojection(self.curr_point, zoom=False)
def save_gcps(self, filename=None):
self.gcp_manager.write_to_file(filename)
def go_to_current_gcp(self):
"""
Jumps to the currently selected GCP in all views where it was not visible
"""
if not self.curr_point:
return
shots_gcp_seen = {
p["shot_id"] for p in self.gcp_manager.points[self.curr_point]
}
for view in self.sequence_views:
shots_gcp_seen_this_view = list(
shots_gcp_seen.intersection(view.images_in_list)
)
if (
len(shots_gcp_seen_this_view) > 0
and view.current_image not in shots_gcp_seen
):
target_shot = random.choice(shots_gcp_seen_this_view)
view.bring_new_image(target_shot)
def go_to_worst_gcp(self):
if len(self.gcp_manager.gcp_reprojections) == 0:
print("No GCP reprojections available. Can't jump to worst GCP")
return
worst_gcp = self.gcp_manager.get_worst_gcp()
if worst_gcp is None:
return
self.curr_point = worst_gcp
self.gcp_list_box.selection_clear(0, "end")
for ix, gcp_id in enumerate(self.gcp_list_box.get(0, "end")):
if worst_gcp in gcp_id:
self.gcp_list_box.selection_set(ix)
break
for view in self.sequence_views:
# Get the shot with worst reprojection error that in this view
shot_worst_gcp = self.gcp_manager.shot_with_max_gcp_error(
view.images_in_list, worst_gcp
)
if shot_worst_gcp:
view.bring_new_image(shot_worst_gcp)
def clear_latlon_sources(self, view):
# The user has activated the 'Track this' checkbox in some view
for v in self.sequence_views:
if v is not view:
v.is_latlon_source.set(False)
def refocus_overhead_views(self, lat, lon):
for view in self.cad_views:
view.refocus(lat, lon)