annotation_gui_gcp/lib/views/web_view.py (68 lines of code) (raw):
import abc
import json
import time
from queue import Queue
from flask import Response, jsonify, render_template, request
distinct_colors = [
"#46f0f0",
"#f032e6",
"#bcf60c",
"#fabebe",
"#008080",
"#9a6324",
"#fffac8",
"#800000",
"#aaffc3",
"#808000",
"#3cb44b",
"#ffe119",
"#4363d8",
"#f58231",
"#911eb4",
"#000075",
"#808080",
"#ffffff",
"#000000",
]
class WebView(abc.ABC):
def __init__(self, main_ui, web_app, route_prefix):
self.main_ui = main_ui
self.app = web_app
self.eventQueue = Queue()
self.latlons = main_ui.image_manager.load_latlons()
self.register_routes(route_prefix)
self.route_prefix = route_prefix
@abc.abstractclassmethod
def sync_to_client(self):
pass
@abc.abstractclassmethod
def process_client_message(self, data):
pass
def template_name(self):
class_name = type(self).__name__
return class_name
def register_routes(self, route):
def send_main_page():
template = self.template_name()
return render_template(f"{template}.html", class_name=template)
self.app.add_url_rule(route, route + "_index", send_main_page)
def postdata():
data = request.get_json()
# Do something with the event received from the client
if data["event"] != "init":
self.process_client_message(data)
# Send a sync event back to the client to reflect the changed state (all views)
self.main_ui.sync_to_client()
return jsonify(success=True)
self.app.add_url_rule(
route + "/postdata", route + "_postdata", postdata, methods=["POST"]
)
# Stream for server -> client updates through Server-Sent Events
def stream():
def eventStream():
while True:
msg = self.eventQueue.get() # blocks until a new message arrives
yield msg
return Response(eventStream(), mimetype="text/event-stream")
self.app.add_url_rule(route + "/stream", route + "_stream", stream)
def send_sse_message(self, data, event_type="sync"):
# Send to the client
data["time"] = time.time()
sse_string = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
self.eventQueue.put(sse_string)