annotation_gui_gcp/main.py (196 lines of code) (raw):
import argparse
import json
import typing as t
from collections import OrderedDict, defaultdict
from pathlib import Path
import numpy as np
from annotation_gui_gcp.lib import GUI
from annotation_gui_gcp.lib.gcp_manager import GroundControlPointManager
from annotation_gui_gcp.lib.image_manager import ImageManager
from flask import Flask
from opensfm import dataset, io
def get_parser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("dataset", help="dataset")
parser.add_argument(
"--group-by-reconstruction",
action="store_true",
help="If set, the UI will show one window per reconstruction, "
"otherwise, it will use sequences as specified by 'sequence-file'",
)
parser.add_argument(
"--strict-missing",
action="store_true",
)
parser.add_argument(
"--min-images-in-reconstruction",
type=int,
default=50,
help="Reconstructions with fewer images than this value will not be "
"displayed in the UI",
)
parser.add_argument(
"--sequence-file",
help="dict-of-image-keys JSON file describing each sequence. "
"Format: {'sequence_key': ['im_key_1', 'im_key_2', ...], ...}",
default="sequence_database.json",
)
parser.add_argument(
"--cad",
type=str,
help="Specify a directory containing CAD files in FBX format",
default=None,
)
parser.add_argument(
"--port",
type=int,
default=5000,
)
return parser
def file_sanity_check(root, seq_dict, fname):
# Images available under ./images for a sanity check
available_images = {p.name for p in (root / "images").iterdir()}
keys_in_seq_dict = {im_key for seq_keys in seq_dict.values() for im_key in seq_keys}
images_not_in_seq_file = available_images - keys_in_seq_dict
if len(images_not_in_seq_file) > 0:
print(f"{len(images_not_in_seq_file)} images not in {fname}")
n_missing = len(keys_in_seq_dict - available_images)
if n_missing > 0:
print(f"There are {n_missing} images from {fname} missing in {(root/'images')}")
return available_images
def load_rig_assignments(root: Path) -> t.Dict[str, t.List[str]]:
"""
Returns a dict mapping every shot to all the other corresponding shots in the rig
"""
p_json = root / "rig_assignments.json"
if not p_json.exists():
return {}
output = {}
with open(p_json) as f:
assignments: t.Dict[str, t.List[t.Tuple[str, str]]] = json.load(f)
for shot_group in assignments.values():
group_shot_ids = [s[0] for s in shot_group]
for shot_id, _ in shot_group:
output[shot_id] = group_shot_ids
return output
def load_sequence_database_from_file(
root, fname="sequence_database.json", skip_missing: bool=False
):
"""
Simply loads a sequence file and returns it.
This doesn't require an existing SfM reconstruction
"""
root = Path(root)
p_json = root / fname
if not p_json.exists():
return None
with open(p_json) as f:
seq_dict = OrderedDict(io.json_load(f))
available_images = file_sanity_check(root, seq_dict, fname)
for skey in seq_dict:
available_image_keys = []
for k in seq_dict[skey]:
if k in available_images:
available_image_keys.append(k)
elif not skip_missing:
raise FileNotFoundError(f"{k} not found")
seq_dict[skey] = available_image_keys
empty_seqs = [skey for skey in seq_dict if not seq_dict[skey]]
for skey in empty_seqs:
del seq_dict[skey]
return seq_dict
def load_shots_from_reconstructions(path, min_ims):
data = dataset.DataSet(path)
reconstructions = data.load_reconstruction()
# Replace sequence keys for those in sequence_database.json
n_recs = len(reconstructions)
if len(reconstructions) > 2:
reconstructions = [
r
for ix_r, r in enumerate(reconstructions)
if len(r.shots) >= min_ims or ix_r < 2
]
if len(reconstructions) < n_recs:
print(
"Kept {}/{} reconstructions (min images: {})".format(
len(reconstructions),
n_recs,
min_ims,
)
)
output = []
for rec in reconstructions:
shots = sorted(
rec.shots.values(), key=lambda x: (x.metadata.capture_time.value, x.id)
)
output.append([shot.id for shot in shots])
return output
def group_by_reconstruction(args, groups_from_sequence_database):
all_recs_shots = load_shots_from_reconstructions(
args.dataset, min_ims=args.min_images_in_reconstruction
)
map_key_to_skey = {}
if groups_from_sequence_database:
for skey, keys in groups_from_sequence_database.items():
for k in keys:
map_key_to_skey[k] = skey
groups = defaultdict(list)
for ix_rec, rec_shots in enumerate(all_recs_shots):
for key in rec_shots:
if key in map_key_to_skey:
group_key = f"REC#{ix_rec}_{map_key_to_skey[key]}"
else:
group_key = f"REC#{ix_rec}"
groups[group_key].append(key)
return groups
def group_images(args):
"""
Groups the images to be shown in different windows/views
If group_by_reconstruction is set, each reconstruction will have its own view
If there is a sequence_database file, each sequence will have its own view
If group_by_reconstruction is set and there is a sequence_database file, the views
will be split by sequence and also by reconstruction. For example, if there is a camera
rig (4 sequences) that is reconstructed into two disjoint reconstructions, you end up
with 8 views.
"""
groups_from_sequence_database = load_sequence_database_from_file(
args.dataset,
args.sequence_file,
skip_missing=not args.strict_missing,
)
if args.group_by_reconstruction:
return group_by_reconstruction(args, groups_from_sequence_database)
else:
# We only group by sequence key
if groups_from_sequence_database is None:
print(
f"No sequence database file at {args.sequence_file}"
" and --group-by-reconstruction is disabled. Quitting"
)
exit()
return groups_from_sequence_database
def find_suitable_cad_paths(path_cad_files, path_dataset, n_paths: int=6):
if path_cad_files is None:
return []
def latlon_from_meta(path_cad):
path_meta = path_cad.with_suffix(".json")
with open(path_meta) as f:
meta = json.load(f)
return meta["center"]["latitude"], meta["center"]["longitude"]
# Returns the top N cad models sorted by distance to the dataset
path_cad_files = Path(path_cad_files)
cad_files = list(
set(
list(path_cad_files.glob("**/*.FBX"))
+ list(path_cad_files.glob("**/*.fbx"))
)
)
ref = dataset.DataSet(path_dataset).load_reference()
ref_latlon = np.array([ref.lat, ref.lon])
cad_latlons = np.array([latlon_from_meta(path) for path in cad_files])
distances = np.linalg.norm(cad_latlons - ref_latlon, axis=1)
ixs_sort = np.argsort(distances)[:n_paths]
return [cad_files[i] for i in ixs_sort]
def init_ui():
app = Flask(__name__)
parser = get_parser()
args = parser.parse_args()
path = args.dataset
rig_groups = load_rig_assignments(Path(args.dataset))
groups = group_images(args)
image_manager = ImageManager(
groups,
path,
)
gcp_manager = GroundControlPointManager(path)
GUI.Gui(
app,
gcp_manager,
image_manager,
rig_groups,
find_suitable_cad_paths(args.cad, path, 1),
)
return app, args
if __name__ == "__main__":
app, args = init_ui()
app.run(host="::", port=args.port)