def main()

in projects/vision-ai-edge-platform/camera-client/camera_client.py [0:0]


def main():
    """Runs ML inference against camera images."""
    global mqtt_msg

    args = parse_args()

    client_config = {}
    logger = logging.getLogger()
    logger.setLevel(args.log.upper())
    streamhandler = logging.StreamHandler()
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    streamhandler.setFormatter(formatter)
    logger.addHandler(streamhandler)
    logger.debug("Main started")

    mqtt_client = None
    cwd = os.path.dirname(os.path.abspath("camera-client.py"))

    payload_struct_thermal = {}
    payload_struct_thermal["device_id"] = args.device_id
    payload_struct_thermal["temp_format"] = args.temp_format

    payload_struct = {}
    payload_struct["device_id"] = args.device_id

    printout = False
    if "print" in args.stdout:
        printout = True

    if args.client_cfg_file:
        if printout:
            logger.info("Loading client config from: %s", args.client_cfg_file)
        client_config = load_client_config(args.client_cfg_file)
        logger.debug("Client config: %s", client_config)

    if args.ml:
        try:
            url_template = client_config["ML_MODEL"].get("url")
            request_body_template = client_config["ML_MODEL"].get(
                "request_body"
            )
        except KeyError:
            if printout:
                logger.error(
                    "ML_MODEL section missing in cfg: %s. Exiting.",
                    args.client_cfg_file,
                )
                exit(1)
        if not url_template or not request_body_template:
            if printout:
                logger.error(
                    "ML_MODEL missing in cfg file: %s. Exiting.",
                    args.client_cfg_file,
                )
                exit(1)
        ml_url = url_template.format(hostname=args.ml_host, port=args.ml_port)
        if printout:
            logger.info("ML URL: %s", ml_url)
            logger.info("ML payload template: %s", request_body_template)

    protobuf = False
    if "protobuf" in args.stdout:
        protobuf = True

    crop = False
    if (
        args.crop_top > -1
        and args.crop_bottom > -1
        and args.crop_left > -1
        and args.crop_right > -1
    ):
        crop = True

    cam = None
    if args.protocol == "genicam":
        cam = edge_camera.GenicamCamera(
            args.address,
            args.device_id,
            os.path.join(cwd, args.gentl),
            logger,
            args.stdout,
            protobuf,
        )
    elif args.protocol == "onvif":
        cam = edge_camera.OnvifCamera(
            args.address,
            args.cam_user,
            args.cam_passwd,
            args.device_id,
            logger,
            args.stdout,
            protobuf,
        )
    elif args.protocol == "rtsp":
        cam = edge_camera.RtspCamera(
            args.address,
            args.cam_user,
            args.cam_passwd,
            args.device_id,
            logger,
            args.stdout,
            protobuf,
        )
    elif args.protocol == "usb":
        cam = edge_camera.UsbCamera(
            args.address, args.device_id, logger, args.stdout, protobuf
        )
    elif args.protocol == "file" and args.mode != "batch":
        cam = edge_camera.FileCamera(
            args.address, args.device_id, logger, args.stdout, protobuf
        )
    elif args.protocol == "file" and args.mode == "batch":
        pass
    else:
        if printout:
            logger.info("Unsupported protocol: %s. Exiting", args.protocol)
        exit(1)

    if args.scan:
        exit_gracefully(cam, mqtt_client)

    if args.health_check:
        cam.open()
        health = cam.health_check()
        if printout:
            logger.info("Camera health check result: %s", health)
        elif protobuf:
            edge_camera.print_without_eol(health)
        cam.close()
        exit(0)

    if args.cfg_write:
        if hasattr(cam, "set_properties"):
            write_config(printout, logger, cam, args.cfg_write_file)
        else:
            if printout:
                logger.info(
                    "Camera type: %s does not support cfg, exiting",
                    args.protocol,
                )
            exit(1)

    if args.cfg_read:
        if hasattr(cam, "get_properties"):
            read_config(printout, logger, cam, args.cfg_read_file)
        else:
            if printout:
                logger.info(
                    "Camera type: %s does not support cfg, exiting",
                    args.protocol,
                )
            exit(1)

    if args.img_write and args.mode != "none" and printout:
        logger.info("Saving images to: %s", args.img_write_path)

    if args.raw_write and args.mode != "none" and printout:
        logger.info("Saving raw data to: %s", args.raw_write_path)

    if args.ml and args.mode != "none" and printout:
        logger.info("Passing camera images to the ML model container")
        if args.ml_write:
            logger.info(
                "Writing inference result JSON files to: %s", args.ml_write_path
            )

    if args.mqtt:
        if printout:
            logger.info(
                "Starting MQTT client. Publishing results to: %s",
                args.mqtt_topic_results,
            )
        mqtt_client = mqtt.Client(userdata=args.mqtt_topic_commands)
        mqtt_client.on_connect = mqtt_on_connect
        mqtt_client.on_message = mqtt_on_message
        mqtt_client.connect_async(args.mqtt_host, args.mqtt_port, 60)
        mqtt_client.loop_start()

    if "none" not in args.pubsub:
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.credentials
        publisher = pubsub_v1.PublisherClient(
            client_info=client_info.ClientInfo(user_agent=_SOLUTION_USER_AGENT)
        )
        publisher.topic_path(args.project_id, args.topic_id)
        if printout:
            logger.info("Created Pub/Sub client")

    byte_io = io.BytesIO()
    loop = False
    single = False
    interactive = False
    continuous = False
    mqtt_sub = False
    batch = False
    buffer = None
    sensor_array = None
    height = None
    width = None
    if args.mode != "none":
        loop = True
    if args.mode == "single":
        single = True
    elif args.mode == "interactive":
        interactive = True
    elif args.mode == "continuous":
        continuous = True
    elif args.mode == "mqtt_sub":
        mqtt_sub = True
    elif args.mode == "batch":
        batch = True

    count = 0
    total_count = args.count

    if batch:
        batch_files = process_files(args.address)
        total_count = len(batch_files)
        logger.debug("Batch input file list: %s: %s", total_count, batch_files)

    my_mqtt_topics = [
        args.mqtt_topic_commands,
        f"{args.mqtt_topic_commands}/{args.device_id}",
    ]
    if printout:
        logger.info("Listening to commands on MQTT topics: %s", my_mqtt_topics)

    if loop:
        if not batch:
            cam.open()

        while loop:
            if batch:
                cam = edge_camera.FileCamera(
                    batch_files.pop(),
                    args.device_id,
                    logger,
                    args.stdout,
                    protobuf,
                )
                cam.open()

            if mqtt_sub:
                if not mqtt_msg:
                    continue
                elif mqtt_topic not in my_mqtt_topics:
                    mqtt_msg = None
                    if printout:
                        logger.debug("Ignoring unknown topic: %s", mqtt_topic)
                    continue
                elif mqtt_msg == "quit" or mqtt_msg == "exit":
                    if printout:
                        logger.info("Quit command received via MQTT..")
                    exit_gracefully(cam, mqtt_client)
                elif mqtt_msg.startswith("get_frame"):
                    if printout:
                        logger.info(
                            "MQTT command: %s: %s", mqtt_topic, mqtt_msg
                        )
                    if mqtt_msg.startswith("get_frame,file://"):
                        if args.protocol == "file":
                            new_file_path = mqtt_msg[len("get_frame,file://") :]
                            cam.set_address(new_file_path)
                            logger.info(
                                "Image file address set to: %s", new_file_path
                            )
                        else:
                            logger.error(
                                "Setting address in MQTT get_frame command is "
                                "only supported for file protocol"
                            )
                    mqtt_msg = None
                else:
                    if printout:
                        logger.info(
                            "Unknown MQTT command received: %s", mqtt_msg
                        )
                    mqtt_msg = None
                    continue
            now = datetime.datetime.now()
            timestamp_rfc3339 = rfc3339(now)
            timestamp_sec = now.timestamp()
            ts = int(timestamp_sec * 1000)

            if "genicam" in args.protocol and args.raw_write:
                sensor_array, height, width, buffer = cam.get_raw()
                if not height:
                    if printout:
                        logger.info("Genicam raw imager acquiry failed")
                    continue

                if args.temp_format == "C":
                    sensor_array_thermal = sensor_array * 0.4 - 273.15
                elif args.temp_format == "F":
                    sensor_array_thermal = (
                        sensor_array * 0.4 - 273.15
                    ) * 9 / 5 + 32
                else:
                    # default to Kelvin
                    sensor_array_thermal = sensor_array * 0.4
                payload_struct_thermal["temp_avg"] = np.average(
                    sensor_array_thermal
                )
                payload_struct_thermal["temp_min"] = sensor_array_thermal.min()
                payload_struct_thermal["temp_max"] = sensor_array_thermal.max()
                payload_struct_thermal["temp_array"] = sensor_array.tolist()
                payload_struct_thermal["ts"] = timestamp_rfc3339
                payload_byte_array = create_pubsub_payload(
                    payload_struct_thermal, logger
                )
                filename = "{}{}-{}.bin".format(
                    args.raw_write_path, args.device_id, ts
                )
                with open(filename, "wb") as f:
                    f.write(payload_byte_array)
                    f.close()

            if args.img_write or args.ml or protobuf:
                if args.stream_delay and not continuous:
                    if printout:
                        logger.info(
                            "Stream_delay %sms before capturing frame",
                            args.stream_delay,
                        )
                    time.sleep(args.stream_delay / 1000.0)
                if protobuf:
                    pil_img = None
                    if args.protocol == "genicam":
                        edge_camera.print_without_eol(
                            cam.get_frame(args, sensor_array, height, width)
                        )
                    else:
                        edge_camera.print_without_eol(cam.get_frame())
                elif args.protocol == "genicam":
                    pil_img = cam.get_frame(args, sensor_array, height, width)
                else:
                    pil_img = cam.get_frame()

                if pil_img:
                    if crop:
                        pil_img = pil_img.crop(
                            (
                                args.crop_left,
                                args.crop_top,
                                args.crop_right,
                                args.crop_bottom,
                            )
                        )

                    if args.width > 0 and args.height > 0:
                        w, h = pil_img.size
                        if w != args.width or h != args.height:
                            pil_img = pil_img.resize((args.width, args.height))

                    if args.img_write:
                        filename = "{}{}-{}.png".format(
                            args.img_write_path, args.device_id, ts
                        )
                        if printout and not continuous:
                            logger.info("Writing image: %s", filename)
                        pil_img.save(filename)
                        if continuous and args.count > 0 and printout:
                            logger.info(
                                "Images written: %s/%s", count + 1, args.count
                            )

                    if args.ml:
                        byte_io = io.BytesIO()
                        pil_img.save(byte_io, "PNG")
                        byte_io.seek(0)
                        results = predict(
                            printout,
                            logger,
                            byte_io.read(),
                            ml_url,
                            request_body_template,
                        )
                        byte_io.seek(0)

                        if args.ml_write:
                            filename = "{}{}-{}.json".format(
                                args.ml_write_path, args.device_id, ts
                            )
                            logger.debug(
                                "Writing ML results JSON: %s", filename
                            )
                            with open(filename, "w", encoding="utf-8") as f:
                                f.write(json.dumps(results))
                            f.close()
                        else:
                            if printout:
                                logger.info(results)

                    if "results" in args.pubsub and args.ml:
                        payload_struct["ts"] = timestamp_rfc3339
                        payload_struct["file_id"] = ts
                        payload_struct["results"] = results
                        payload_byte_array = create_pubsub_payload(
                            payload_struct, logger
                        )
                        transmit_pubsub(
                            printout,
                            logger,
                            publisher,
                            args,
                            payload_byte_array,
                        )

                    if args.mqtt and args.ml:
                        transmit_mqtt(
                            printout,
                            logger,
                            mqtt_client,
                            results,
                            args.mqtt_topic_results,
                        )

            time.sleep(args.sleep)
            if buffer:
                cam.queue(buffer)
            if single:
                loop = False
            elif (continuous or batch) and total_count > 0:
                count += 1
                if count >= total_count:
                    loop = False
            elif interactive:
                key = input(
                    "Press Enter to process another image.. Q to quit: "
                )
                if "q" in key or "Q" in key:
                    loop = False
    exit_gracefully(cam, mqtt_client)