projects/vision-ai-edge-camera-client/camera_client.py (825 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=E1120 # ruff: noqa """Camera client utility. Lets users connect to cameras with different protocols, configure the cameras and acquire data from them, run ML inference against the camera frames, forward the results to Google Cloud and expose them via MQTT. Typical usage example: python3 camera_client.py --protocol usb --address /dev/video0 --device_id usbcam1 --img_write --mode interactive """ import argparse import base64 import configparser import datetime import io import json import logging import os import time import warnings import construct import numpy as np import paho.mqtt.client as mqtt import puremagic import requests from google.api_core.gapic_v1 import client_info from google.cloud import pubsub_v1 from rfc3339 import rfc3339 import edge_camera _SOLUTION_USER_AGENT = "cloud-solutions/vision-ai-edge-camera-client-v1" warnings.filterwarnings("ignore", category=DeprecationWarning) mqtt_msg = None mqtt_topic = None def transmit_mqtt(printout, logger, mqtt_client, results, topic): """Transmits ML result to local MQTT topic. Uses MQTT paho client to transmit ML results to a local network MQTT topic. Args: printout: printing messages to stdout. logger: main logger client. mqtt_client: local network MQTT connection client. results: ML inference results from VIAI model. """ if printout: logger.info("Transmitting ML inference results to local MQTT") mqtt_client.publish(topic, json.dumps(results)) if printout: logger.info("Local MQTT transmit complete") def transmit_pubsub(printout, logger, publisher, args, data): """Transmits payload to Cloud Pub/Sub. Transmits a payload to Cloud Pub/Sub. Either image or ML results. Cloud project and Pub/Sub topic from args. Args: printout: printing messages to stdout. logger: main logger client. publisher: Pub/Sub client. args: main program command line arguments. data: payload to transmit. """ if printout: logger.info("Transmitting data to Cloud Pub/Sub") topic = "projects/{}/topics/{}".format(args.project_id, args.topic_id) if "image" in args.pubsub: img_byte_arr = io.BytesIO() data.save(img_byte_arr, format="PNG") data = img_byte_arr.getvalue() future = publisher.publish(topic, data, type=args.pubsub) if printout: logger.info(future.result()) logger.info("Published data to Pub/Sub topic {}".format(topic)) def read_config(printout, logger, cam, cfg_file): """Queries camera for its runtime configurations and save to file. For camera types that support configuration management, uses the cam class get_properties() method, to query camera's current configs. Writes config parameter/value pairs to an output text file. Args: printout: printing messages to stdout. logger: main logger client. cam: camera object. cfg_file: queried configurations output file. """ if printout: logger.info("Querying camera runtime cfg and saving to: %s", cfg_file) props = cam.get_properties() logger.debug("Camera properties: {}".format(props)) with open(cfg_file, "w", encoding="utf-8") as f: for prop in props.keys(): f.write("{} = {}\n".format(prop, props[prop])) f.close() def create_pubsub_payload(results, logger): """Creates a binary payload compatible with Cloud Pub/Sub. Receives a dict containing ML inference results, or a thermal camera pre-processed data dict. For ML results, creates a JSON dump, for thermal data, uses Python construct to create a custom binary payload. Args: results: payload dict. logger: Logging component. Returns: payload or None, if input data format not recognized. """ if "results" in results.keys(): return json.dumps(results).encode("utf-8") elif "temp_array" in results.keys(): payload_struct = construct.Struct( "device_id" / construct.PascalString(construct.VarInt, "utf-8"), "ts" / construct.PascalString(construct.VarInt, "utf-8"), "temp_format" / construct.PascalString(construct.VarInt, "utf-8"), "temp_avg" / construct.Half, "temp_min" / construct.Half, "temp_max" / construct.Half, "temp_array" / construct.GreedyRange(construct.Short), ) payload = payload_struct.build( dict( device_id=results["device_id"], ts=results["ts"], temp_format=results["temp_format"], temp_avg=results["temp_avg"], temp_min=results["temp_min"], temp_max=results["temp_max"], temp_array=results["temp_array"], ) ) return payload else: logger.error("Unknown input data format") return def write_config(printout, logger, cam, cfg_file): """Write desired configurations to the camera. For camera types that support configuration management, uses the cam class set_properties() method, to write or update the camera's runtime configs. Read config parameter/value pairs from input text file. Args: printout: printing messages to stdout. logger: main logger client. cam: camera object. cfg_file: target configurations input file. """ if printout: logger.info("Reading config file: {}".format(cfg_file)) try: f = open(cfg_file, "r", encoding="utf-8") except IOError: if printout: logger.error("Error reading file") exit(1) configs = {} for row in f: key, value = row.split(" = ") value = value.rstrip() if value == "True": configs[key] = True elif value == "False": configs[key] = False elif '"' in value: configs[key] = str(value).replace('"', "") elif "." in value or "e" in value: configs[key] = float(value) else: configs[key] = int(value) f.close() logger.debug("Writing configs to camera: {}".format(configs)) cam.set_properties(configs) return def exit_gracefully(cam, mqtt_client): """Closes camera and MQTT connections and exits gracefully. Closes all connections gracefully and exits with code 0. Args: cam: camera object. mqtt_client: local network MQTT client. """ cam.close() if mqtt_client: mqtt_client.loop_stop() exit(0) def int_map_range(x, in_min, in_max, out_min, out_max): """Maps an input value from one value range to another, as int. Used for thermal data pre-processing, for example to change microbolometer values from 14-bit float accuracy to 8-bit int. Args: x: input value. in_min: input value range min. in_max: input value range max. out_min: new value range min. out_max: new value range max. Returns: int of the input value, in the new value range. """ return int((x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min) def create_request_body(image_data, request_body_template): """Creates the request body to perform API calls. Args: image_data: The input image. request_body_template (str): Bbody template with placeholders. Returns: A JSON format string of the request body. """ encoded_string = base64.b64encode(image_data).decode("utf-8") # Use json.loads to parse the template string into a Python dictionary request_body = json.loads(request_body_template) # Define a recursive function to find and replace the placeholder def replace_placeholder(data): if isinstance(data, dict): for key, value in data.items(): if isinstance(value, str) and "{encoded_string}" in value: data[key] = value.format(encoded_string=encoded_string) else: replace_placeholder(value) elif isinstance(data, list): for _, item in enumerate(data): replace_placeholder(item) # Call the recursive function to replace the placeholder replace_placeholder(request_body) return json.dumps(request_body) def predict(printout, logger, image_data, ml_url, request_body_template): """Predict results on the input image using services at the given port. Args: printout: printing messages to stdout. logger: main logger client. image_data: The input image. ml_url: The URL of the ML model service. request_body_template: The request body template with placeholders. Returns: The predicted results in JSON format or None. """ url = ml_url request_body = create_request_body(image_data, request_body_template) logger.debug(f"Sending image to ML URL: {url}") try: response = requests.post(url, data=request_body) logger.debug("Response received: {}".format(response)) except requests.exceptions.RequestException as e: if printout: logger.error( "Error posting image to model service port: %s", format(e) ) return json_response = "" try: json_response = response.json() except requests.exceptions.JSONDecodeError: if printout: logger.error("ML response not in JSON format: %s", response.text()) return json_response def mqtt_on_connect(client, userdata, unused_arg2, rc): """Subscribes to the MQTT topic. MQTT connect callback. Subscribes to the topic, after the MQTT broker connection has been opened. Args: client: MQTT client. userdata: MQTT userdata, contains topic to subscribe to. unused_arg2: MQTT flags, not used. rc: MQTT return code, not used. """ print(f"Local network MQTT connected with result code {rc}") print(f"Subscribing to MQTT topic: {userdata}/#") ret = client.subscribe(userdata + "/#", qos=0) print(f"MQTT subscription result code: {ret}") def mqtt_on_message(unused_arg1, unused_arg2, msg): """Receives an MQTT message. MQTT message received callback. Updates a global variable with the received data payload. The MQTT client runs in a separate thread, thus global variable used to make the payload visible to the main function. Args: unused_arg1: MQTT client, not used. unused_arg2: MQTT userdata, not used. msg: received message. """ global mqtt_msg mqtt_msg = msg.payload.decode("utf-8") global mqtt_topic mqtt_topic = msg.topic def process_files(directory): """Recursively searches a directory for image files and returns a list of their paths. Args: directory: The directory to search. Returns: A list of full paths to image files within the directory and its subdirectories. """ batch_files = [] for file in os.listdir(directory): full_path = os.path.join(directory, file) if os.path.isdir(full_path): process_files(full_path) else: image_type = puremagic.magic.from_file(full_path) if image_type is not None: batch_files.append(full_path) return batch_files def load_client_config(config_file): """Loads the app configuration from specified file. Args: config_file (str): Path to the configuration file. """ client_config = configparser.ConfigParser() client_config.read(config_file) return client_config def parse_args(): """Parses arguments. Returns: Parsed arguments. """ parser = argparse.ArgumentParser() parser.add_argument( "--log", default="warning", choices=["debug", "info", "warning", "error", "critical"], help="Provide logging level. Example --log debug, default=warning", ) parser.add_argument( "--protocol", default="genicam", choices=["genicam", "onvif", "rtsp", "usb", "file"], help="Camera or image source connectivity method", ) parser.add_argument("--device_id", help="Camera ID string", default="1") parser.add_argument( "--address", default=None, type=str, help="Camera address to connect to. For Genicam, integer from 0-n", ) parser.add_argument( "--cam_user", default="", type=str, help="Camera access username" ) parser.add_argument( "--cam_passwd", default="", type=str, help="Camera access password" ) parser.add_argument( "--gentl", default="GenTL/FLIR_GenTL_Ubuntu_20_04_x86_64.cti", type=str, help="Camera Gen_TL file path", ) parser.add_argument( "--background_acquisition", help="True: GenTL background acquisition. False: Buffer from camera", default=True, type=bool, ) parser.add_argument( "--mode", default="none", choices=[ "none", "single", "continuous", "interactive", "mqtt_sub", "batch", ], help="Imaging acquisition mode. None for camera config read or write", ) parser.add_argument( "--width", default=0, type=int, help="Image width in pixels. Resized to this value if needed.", ) parser.add_argument( "--height", default=0, type=int, help="Image height in pixels. Resized to this value if needed.", ) parser.add_argument( "--count", default=0, type=int, help="Number of images to generate. 0 for indefinite.", ) parser.add_argument( "--sleep", type=int, default=0, help="Sleep interval in seconds between loops. Default: 0", ) parser.add_argument( "--pubsub", default="none", choices=["none", "results"], help="Whether to transmit ML inference results or none to Pub/Sub", ) parser.add_argument( "--topic_id", default="camera-integration-telemetry", help="GCP Pub/Sub topic name", ) parser.add_argument( "--ml", action="store_true", help="Whether to run ML inference on image feed", ) parser.add_argument( "--raw_write", action="store_true", help="Whether to save raw IR sensor array data", ) parser.add_argument( "--raw_write_path", default="raw/", help="Path where to write binary IR sensor array files", ) parser.add_argument( "--img_write", action="store_true", help="Whether to save IR images" ) parser.add_argument( "--img_write_path", default="images/", help="Path where to write IR image files", ) parser.add_argument( "--cfg_read", action="store_true", help="Whether to read camera runtime configs and output to a file", ) parser.add_argument( "--cfg_read_file", default="camera.cfg", help="Name of configuration file to output", ) parser.add_argument( "--cfg_write", action="store_true", help="Whether to write user-defined configs from a file to the camera ", ) parser.add_argument( "--cfg_write_file", help="Name of user-defined configuration file to apply", ) parser.add_argument( "--temp_format", default="C", choices=["K", "C", "F"], help="Radiometric temperature format: K, C or F", ) parser.add_argument( "--range_min", default=0, type=int, help="Raw data value to map to 8bit (0-254)", ) parser.add_argument( "--range_max", default=0, type=int, help="Raw data value to map to 8bit (0-254)", ) parser.add_argument( "--cloud_region", default="us-central1", help="GCP cloud region" ) parser.add_argument("--project_id", help="GCP cloud project ID") parser.add_argument( "--credentials", default="./credentials.json", type=str, help="Service account JSON key. Default: ./credentials.json", ) parser.add_argument( "--ml_host", default="127.0.0.1", type=str, help="IP address where the ML model inference service is running", ) parser.add_argument( "--ml_port", default=8602, type=int, help="ML inference service port" ) parser.add_argument( "--ml_write", action="store_true", help="Whether to save ML inference results JSON files", ) parser.add_argument( "--ml_write_path", default="output/", help="Path where to write ML inference result JSON files", ) parser.add_argument( "--mqtt", action="store_true", help="Whether to publish ML results to a local MQTT broker", ) parser.add_argument( "--mqtt_host", default="localhost", type=str, help="Local network MQTT connection host address", ) parser.add_argument( "--mqtt_port", default=1883, type=int, help="Local network MQTT connection host address", ) parser.add_argument( "--mqtt_topic_commands", default="viai/commands", type=str, help="MQTT topic where to listen to commends", ) parser.add_argument( "--mqtt_topic_results", default="viai/results", type=str, help="MQTT topic where to post ML results", ) parser.add_argument( "--health_check", action="store_true", help="Test camera connection health. Exits with True|False", ) parser.add_argument( "--stdout", default="print", choices=["none", "print", "protobuf"], help="STDOUT mode: none, print or protobuf", ) parser.add_argument( "--scan", action="store_true", help="If enabled, scan for cameras of type --protocol only.", ) parser.add_argument( "--stream_delay", type=int, help="Delay in ms before taking frame, to sync with stream latency.", ) parser.add_argument( "--crop_left", default=-1, type=int, help="Crop coordinates left edge pixel.", ) parser.add_argument( "--crop_top", default=-1, type=int, help="Crop coordinates top edge pixel.", ) parser.add_argument( "--crop_right", default=-1, type=int, help="Crop coordinates right edge pixel.", ) parser.add_argument( "--crop_bottom", default=-1, type=int, help="Crop coordinates bottom edge pixel.", ) parser.add_argument( "--client_cfg_file", default=None, type=str, help="Name of configuration file to output", ) return parser.parse_args() def main(): """Runs ML inference against camera images.""" global mqtt_msg args = parse_args() client_config = {} logger = logging.getLogger() logger.setLevel(args.log.upper()) streamhandler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) streamhandler.setFormatter(formatter) logger.addHandler(streamhandler) logger.debug("Main started") mqtt_client = None cwd = os.path.dirname(os.path.abspath("camera-client.py")) payload_struct_thermal = {} payload_struct_thermal["device_id"] = args.device_id payload_struct_thermal["temp_format"] = args.temp_format payload_struct = {} payload_struct["device_id"] = args.device_id printout = False if "print" in args.stdout: printout = True if args.client_cfg_file: if printout: logger.info("Loading client config from: %s", args.client_cfg_file) client_config = load_client_config(args.client_cfg_file) logger.debug("Client config: %s", client_config) if args.ml: try: url_template = client_config["ML_MODEL"].get("url") request_body_template = client_config["ML_MODEL"].get( "request_body" ) except KeyError: if printout: logger.error( "ML_MODEL section missing in cfg: %s. Exiting.", args.client_cfg_file, ) exit(1) if not url_template or not request_body_template: if printout: logger.error( "ML_MODEL missing in cfg file: %s. Exiting.", args.client_cfg_file, ) exit(1) ml_url = url_template.format(hostname=args.ml_host, port=args.ml_port) if printout: logger.info("ML URL: %s", ml_url) logger.info("ML payload template: %s", request_body_template) protobuf = False if "protobuf" in args.stdout: protobuf = True crop = False if ( args.crop_top > -1 and args.crop_bottom > -1 and args.crop_left > -1 and args.crop_right > -1 ): crop = True cam = None if args.protocol == "genicam": cam = edge_camera.GenicamCamera( args.address, args.device_id, os.path.join(cwd, args.gentl), logger, args.stdout, protobuf, ) elif args.protocol == "onvif": cam = edge_camera.OnvifCamera( args.address, args.cam_user, args.cam_passwd, args.device_id, logger, args.stdout, protobuf, ) elif args.protocol == "rtsp": cam = edge_camera.RtspCamera( args.address, args.cam_user, args.cam_passwd, args.device_id, logger, args.stdout, protobuf, ) elif args.protocol == "usb": cam = edge_camera.UsbCamera( args.address, args.device_id, logger, args.stdout, protobuf ) elif args.protocol == "file" and args.mode != "batch": cam = edge_camera.FileCamera( args.address, args.device_id, logger, args.stdout, protobuf ) elif args.protocol == "file" and args.mode == "batch": pass else: if printout: logger.info("Unsupported protocol: %s. Exiting", args.protocol) exit(1) if args.scan: exit_gracefully(cam, mqtt_client) if args.health_check: cam.open() health = cam.health_check() if printout: logger.info("Camera health check result: %s", health) elif protobuf: edge_camera.print_without_eol(health) cam.close() exit(0) if args.cfg_write: if hasattr(cam, "set_properties"): write_config(printout, logger, cam, args.cfg_write_file) else: if printout: logger.info( "Camera type: %s does not support cfg, exiting", args.protocol, ) exit(1) if args.cfg_read: if hasattr(cam, "get_properties"): read_config(printout, logger, cam, args.cfg_read_file) else: if printout: logger.info( "Camera type: %s does not support cfg, exiting", args.protocol, ) exit(1) if args.img_write and args.mode != "none" and printout: logger.info("Saving images to: %s", args.img_write_path) if args.raw_write and args.mode != "none" and printout: logger.info("Saving raw data to: %s", args.raw_write_path) if args.ml and args.mode != "none" and printout: logger.info("Passing camera images to the ML model container") if args.ml_write: logger.info( "Writing inference result JSON files to: %s", args.ml_write_path ) if args.mqtt: if printout: logger.info( "Starting MQTT client. Publishing results to: %s", args.mqtt_topic_results, ) mqtt_client = mqtt.Client(userdata=args.mqtt_topic_commands) mqtt_client.on_connect = mqtt_on_connect mqtt_client.on_message = mqtt_on_message mqtt_client.connect_async(args.mqtt_host, args.mqtt_port, 60) mqtt_client.loop_start() if "none" not in args.pubsub: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.credentials publisher = pubsub_v1.PublisherClient( client_info=client_info.ClientInfo(user_agent=_SOLUTION_USER_AGENT) ) publisher.topic_path(args.project_id, args.topic_id) if printout: logger.info("Created Pub/Sub client") byte_io = io.BytesIO() loop = False single = False interactive = False continuous = False mqtt_sub = False batch = False buffer = None sensor_array = None height = None width = None if args.mode != "none": loop = True if args.mode == "single": single = True elif args.mode == "interactive": interactive = True elif args.mode == "continuous": continuous = True elif args.mode == "mqtt_sub": mqtt_sub = True elif args.mode == "batch": batch = True count = 0 total_count = args.count if batch: batch_files = process_files(args.address) total_count = len(batch_files) logger.debug("Batch input file list: %s: %s", total_count, batch_files) my_mqtt_topics = [ args.mqtt_topic_commands, f"{args.mqtt_topic_commands}/{args.device_id}", ] if printout: logger.info("Listening to commands on MQTT topics: %s", my_mqtt_topics) if loop: if not batch: cam.open() while loop: if batch: cam = edge_camera.FileCamera( batch_files.pop(), args.device_id, logger, args.stdout, protobuf, ) cam.open() if mqtt_sub: if not mqtt_msg: continue elif mqtt_topic not in my_mqtt_topics: mqtt_msg = None if printout: logger.debug("Ignoring unknown topic: %s", mqtt_topic) continue elif mqtt_msg == "quit" or mqtt_msg == "exit": if printout: logger.info("Quit command received via MQTT..") exit_gracefully(cam, mqtt_client) elif mqtt_msg.startswith("get_frame"): if printout: logger.info( "MQTT command: %s: %s", mqtt_topic, mqtt_msg ) if mqtt_msg.startswith("get_frame,file://"): if args.protocol == "file": new_file_path = mqtt_msg[len("get_frame,file://") :] cam.set_address(new_file_path) logger.info( "Image file address set to: %s", new_file_path ) else: logger.error( "Setting address in MQTT get_frame command is " "only supported for file protocol" ) mqtt_msg = None else: if printout: logger.info( "Unknown MQTT command received: %s", mqtt_msg ) mqtt_msg = None continue now = datetime.datetime.now() timestamp_rfc3339 = rfc3339(now) timestamp_sec = now.timestamp() ts = int(timestamp_sec * 1000) if "genicam" in args.protocol and args.raw_write: sensor_array, height, width, buffer = cam.get_raw() if not height: if printout: logger.info("Genicam raw imager acquiry failed") continue if args.temp_format == "C": sensor_array_thermal = sensor_array * 0.4 - 273.15 elif args.temp_format == "F": sensor_array_thermal = ( sensor_array * 0.4 - 273.15 ) * 9 / 5 + 32 else: # default to Kelvin sensor_array_thermal = sensor_array * 0.4 payload_struct_thermal["temp_avg"] = np.average( sensor_array_thermal ) payload_struct_thermal["temp_min"] = sensor_array_thermal.min() payload_struct_thermal["temp_max"] = sensor_array_thermal.max() payload_struct_thermal["temp_array"] = sensor_array.tolist() payload_struct_thermal["ts"] = timestamp_rfc3339 payload_byte_array = create_pubsub_payload( payload_struct_thermal, logger ) filename = "{}{}-{}.bin".format( args.raw_write_path, args.device_id, ts ) with open(filename, "wb") as f: f.write(payload_byte_array) f.close() if args.img_write or args.ml or protobuf: if args.stream_delay and not continuous: if printout: logger.info( "Stream_delay %sms before capturing frame", args.stream_delay, ) time.sleep(args.stream_delay / 1000.0) if protobuf: pil_img = None if args.protocol == "genicam": edge_camera.print_without_eol( cam.get_frame(args, sensor_array, height, width) ) else: edge_camera.print_without_eol(cam.get_frame()) elif args.protocol == "genicam": pil_img = cam.get_frame(args, sensor_array, height, width) else: pil_img = cam.get_frame() if pil_img: if crop: pil_img = pil_img.crop( ( args.crop_left, args.crop_top, args.crop_right, args.crop_bottom, ) ) if args.width > 0 and args.height > 0: w, h = pil_img.size if w != args.width or h != args.height: pil_img = pil_img.resize((args.width, args.height)) if args.img_write: filename = "{}{}-{}.png".format( args.img_write_path, args.device_id, ts ) if printout and not continuous: logger.info("Writing image: %s", filename) pil_img.save(filename) if continuous and args.count > 0 and printout: logger.info( "Images written: %s/%s", count + 1, args.count ) if args.ml: byte_io = io.BytesIO() pil_img.save(byte_io, "PNG") byte_io.seek(0) results = predict( printout, logger, byte_io.read(), ml_url, request_body_template, ) byte_io.seek(0) if args.ml_write: filename = "{}{}-{}.json".format( args.ml_write_path, args.device_id, ts ) logger.debug( "Writing ML results JSON: %s", filename ) with open(filename, "w", encoding="utf-8") as f: f.write(json.dumps(results)) f.close() else: if printout: logger.info(results) if "results" in args.pubsub and args.ml: payload_struct["ts"] = timestamp_rfc3339 payload_struct["file_id"] = ts payload_struct["results"] = results payload_byte_array = create_pubsub_payload( payload_struct, logger ) transmit_pubsub( printout, logger, publisher, args, payload_byte_array, ) if args.mqtt and args.ml: transmit_mqtt( printout, logger, mqtt_client, results, args.mqtt_topic_results, ) time.sleep(args.sleep) if buffer: cam.queue(buffer) if single: loop = False elif (continuous or batch) and total_count > 0: count += 1 if count >= total_count: loop = False elif interactive: key = input( "Press Enter to process another image.. Q to quit: " ) if "q" in key or "Q" in key: loop = False exit_gracefully(cam, mqtt_client) if __name__ == "__main__": main()