agora/shopper_insights_api/src/app.py (104 lines of code) (raw):

import os from flask import Flask, render_template, Response, request, jsonify import cv2 from openvino.runtime import Core, get_version as ov_get_version from video_processor import VideoProcessor import time import json from prometheus_metrics import PrometheusMetrics from prometheus_client import generate_latest, CONTENT_TYPE_LATEST # Constants FLASK_PORT = int(os.getenv("FLASK_PORT", 5001)) FLASK_DEBUG = os.getenv("FLASK_DEBUG", "false").lower() in ["true", "1", "t"] PROCESSOR_SKIP_FPS = int(os.getenv("PROCESSOR_SKIP_FPS", 2)) # Read ENABLE_SAVING environment variable ENABLE_SAVING = os.getenv("ENABLE_SAVING", "True").lower() in ["true", "1", "t"] app = Flask(__name__) # Global variables video_processors = {} ie = Core() def get_or_create_processor(camera_name, data): if camera_name not in video_processors: index = len(camera_name) debug = bool(data['debug']) video_url = data['video_url'] video_processors[camera_name] = VideoProcessor(video_url, index, camera_name, PROCESSOR_SKIP_FPS, debug,enable_saving=ENABLE_SAVING) if(data.get('areas', None)): video_processors[camera_name].set_restricted_area(data['areas']) else: video_processors[camera_name].update_debug(bool(data['debug'])) if 'areas' in data: print(f"Restricted areas for {camera_name}: {data['areas']}") if(data.get('areas', None)): video_processors[camera_name].set_restricted_area(data['areas']) return video_processors[camera_name] def generate(data, camera_name): processor = get_or_create_processor(camera_name, data) print(f"Starting video feed for {camera_name}") while True: frame = processor.get_frame() if frame is not None: ret, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') else: time.sleep(0.01) @app.route('/') def index(): return render_template('index.html') @app.route('/video_feed') def video_feed(): json_str = request.args.get('data') if json_str is None: return "Missing 'data' parameter in URL", 400 try: data = json.loads(json_str) except ValueError: return "Invalid JSON format", 400 camera_name = data["cameraName"] return Response(generate(data, camera_name), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/set_restricted_areas', methods=['POST']) def set_restricted_areas(): data = request.json areas = data.get('areas', []) camera_name = data.get('cameraName') if camera_name == "": return jsonify({"message": "Video processor not started"}), 200 processor = get_or_create_processor(camera_name, data) if processor: return jsonify({"message": "Restricted areas set successfully"}), 200 else: return jsonify({"error": "Video processor not found"}), 404 @app.route('/set_video_source', methods=['POST']) def set_video_source(): data = request.json camera_name = data.get('cameraName') get_or_create_processor(camera_name, data) print(f"Video source set to {camera_name}") return jsonify({"message": "Video source set successfully"}), 200 @app.route('/status') def status(): camera_name = request.args.get('camera_name', default="") if camera_name: processor = video_processors.get(camera_name) if processor: data = processor.get_detection_data() return jsonify(data) else: return jsonify({"message": "Video processor not found"}), 404 else: all_processors_data = {url: processor.get_detection_data() for url, processor in video_processors.items()} return jsonify(all_processors_data) def get_all_cameras_data_func(): results = {} for video_url, processor in video_processors.items(): results[video_url] = processor.get_detection_data() return results @app.route('/metrics') def metrics(): # Get current detection data for all cameras all_camera_data = get_all_cameras_data_func() # Update all metrics prometheus_metrics.update_metrics(all_camera_data) # Generate and return the metrics in Prometheus format return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST) if __name__ == '__main__': print(f"OpenVINO version: {ov_get_version()}") print(f"Available devices: {ie.available_devices}") # Initialize the metrics prometheus_metrics = PrometheusMetrics() app.run(host='0.0.0.0', port=FLASK_PORT, debug=FLASK_DEBUG, threaded=True)