in projects/vision-ai-edge-platform/camera-client/camera_client.py [0:0]
def main():
"""Runs ML inference against camera images."""
global mqtt_msg
args = parse_args()
client_config = {}
logger = logging.getLogger()
logger.setLevel(args.log.upper())
streamhandler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
streamhandler.setFormatter(formatter)
logger.addHandler(streamhandler)
logger.debug("Main started")
mqtt_client = None
cwd = os.path.dirname(os.path.abspath("camera-client.py"))
payload_struct_thermal = {}
payload_struct_thermal["device_id"] = args.device_id
payload_struct_thermal["temp_format"] = args.temp_format
payload_struct = {}
payload_struct["device_id"] = args.device_id
printout = False
if "print" in args.stdout:
printout = True
if args.client_cfg_file:
if printout:
logger.info("Loading client config from: %s", args.client_cfg_file)
client_config = load_client_config(args.client_cfg_file)
logger.debug("Client config: %s", client_config)
if args.ml:
try:
url_template = client_config["ML_MODEL"].get("url")
request_body_template = client_config["ML_MODEL"].get(
"request_body"
)
except KeyError:
if printout:
logger.error(
"ML_MODEL section missing in cfg: %s. Exiting.",
args.client_cfg_file,
)
exit(1)
if not url_template or not request_body_template:
if printout:
logger.error(
"ML_MODEL missing in cfg file: %s. Exiting.",
args.client_cfg_file,
)
exit(1)
ml_url = url_template.format(hostname=args.ml_host, port=args.ml_port)
if printout:
logger.info("ML URL: %s", ml_url)
logger.info("ML payload template: %s", request_body_template)
protobuf = False
if "protobuf" in args.stdout:
protobuf = True
crop = False
if (
args.crop_top > -1
and args.crop_bottom > -1
and args.crop_left > -1
and args.crop_right > -1
):
crop = True
cam = None
if args.protocol == "genicam":
cam = edge_camera.GenicamCamera(
args.address,
args.device_id,
os.path.join(cwd, args.gentl),
logger,
args.stdout,
protobuf,
)
elif args.protocol == "onvif":
cam = edge_camera.OnvifCamera(
args.address,
args.cam_user,
args.cam_passwd,
args.device_id,
logger,
args.stdout,
protobuf,
)
elif args.protocol == "rtsp":
cam = edge_camera.RtspCamera(
args.address,
args.cam_user,
args.cam_passwd,
args.device_id,
logger,
args.stdout,
protobuf,
)
elif args.protocol == "usb":
cam = edge_camera.UsbCamera(
args.address, args.device_id, logger, args.stdout, protobuf
)
elif args.protocol == "file" and args.mode != "batch":
cam = edge_camera.FileCamera(
args.address, args.device_id, logger, args.stdout, protobuf
)
elif args.protocol == "file" and args.mode == "batch":
pass
else:
if printout:
logger.info("Unsupported protocol: %s. Exiting", args.protocol)
exit(1)
if args.scan:
exit_gracefully(cam, mqtt_client)
if args.health_check:
cam.open()
health = cam.health_check()
if printout:
logger.info("Camera health check result: %s", health)
elif protobuf:
edge_camera.print_without_eol(health)
cam.close()
exit(0)
if args.cfg_write:
if hasattr(cam, "set_properties"):
write_config(printout, logger, cam, args.cfg_write_file)
else:
if printout:
logger.info(
"Camera type: %s does not support cfg, exiting",
args.protocol,
)
exit(1)
if args.cfg_read:
if hasattr(cam, "get_properties"):
read_config(printout, logger, cam, args.cfg_read_file)
else:
if printout:
logger.info(
"Camera type: %s does not support cfg, exiting",
args.protocol,
)
exit(1)
if args.img_write and args.mode != "none" and printout:
logger.info("Saving images to: %s", args.img_write_path)
if args.raw_write and args.mode != "none" and printout:
logger.info("Saving raw data to: %s", args.raw_write_path)
if args.ml and args.mode != "none" and printout:
logger.info("Passing camera images to the ML model container")
if args.ml_write:
logger.info(
"Writing inference result JSON files to: %s", args.ml_write_path
)
if args.mqtt:
if printout:
logger.info(
"Starting MQTT client. Publishing results to: %s",
args.mqtt_topic_results,
)
mqtt_client = mqtt.Client(userdata=args.mqtt_topic_commands)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_message = mqtt_on_message
mqtt_client.connect_async(args.mqtt_host, args.mqtt_port, 60)
mqtt_client.loop_start()
if "none" not in args.pubsub:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.credentials
publisher = pubsub_v1.PublisherClient(
client_info=client_info.ClientInfo(user_agent=_SOLUTION_USER_AGENT)
)
publisher.topic_path(args.project_id, args.topic_id)
if printout:
logger.info("Created Pub/Sub client")
byte_io = io.BytesIO()
loop = False
single = False
interactive = False
continuous = False
mqtt_sub = False
batch = False
buffer = None
sensor_array = None
height = None
width = None
if args.mode != "none":
loop = True
if args.mode == "single":
single = True
elif args.mode == "interactive":
interactive = True
elif args.mode == "continuous":
continuous = True
elif args.mode == "mqtt_sub":
mqtt_sub = True
elif args.mode == "batch":
batch = True
count = 0
total_count = args.count
if batch:
batch_files = process_files(args.address)
total_count = len(batch_files)
logger.debug("Batch input file list: %s: %s", total_count, batch_files)
my_mqtt_topics = [
args.mqtt_topic_commands,
f"{args.mqtt_topic_commands}/{args.device_id}",
]
if printout:
logger.info("Listening to commands on MQTT topics: %s", my_mqtt_topics)
if loop:
if not batch:
cam.open()
while loop:
if batch:
cam = edge_camera.FileCamera(
batch_files.pop(),
args.device_id,
logger,
args.stdout,
protobuf,
)
cam.open()
if mqtt_sub:
if not mqtt_msg:
continue
elif mqtt_topic not in my_mqtt_topics:
mqtt_msg = None
if printout:
logger.debug("Ignoring unknown topic: %s", mqtt_topic)
continue
elif mqtt_msg == "quit" or mqtt_msg == "exit":
if printout:
logger.info("Quit command received via MQTT..")
exit_gracefully(cam, mqtt_client)
elif mqtt_msg.startswith("get_frame"):
if printout:
logger.info(
"MQTT command: %s: %s", mqtt_topic, mqtt_msg
)
if mqtt_msg.startswith("get_frame,file://"):
if args.protocol == "file":
new_file_path = mqtt_msg[len("get_frame,file://") :]
cam.set_address(new_file_path)
logger.info(
"Image file address set to: %s", new_file_path
)
else:
logger.error(
"Setting address in MQTT get_frame command is "
"only supported for file protocol"
)
mqtt_msg = None
else:
if printout:
logger.info(
"Unknown MQTT command received: %s", mqtt_msg
)
mqtt_msg = None
continue
now = datetime.datetime.now()
timestamp_rfc3339 = rfc3339(now)
timestamp_sec = now.timestamp()
ts = int(timestamp_sec * 1000)
if "genicam" in args.protocol and args.raw_write:
sensor_array, height, width, buffer = cam.get_raw()
if not height:
if printout:
logger.info("Genicam raw imager acquiry failed")
continue
if args.temp_format == "C":
sensor_array_thermal = sensor_array * 0.4 - 273.15
elif args.temp_format == "F":
sensor_array_thermal = (
sensor_array * 0.4 - 273.15
) * 9 / 5 + 32
else:
# default to Kelvin
sensor_array_thermal = sensor_array * 0.4
payload_struct_thermal["temp_avg"] = np.average(
sensor_array_thermal
)
payload_struct_thermal["temp_min"] = sensor_array_thermal.min()
payload_struct_thermal["temp_max"] = sensor_array_thermal.max()
payload_struct_thermal["temp_array"] = sensor_array.tolist()
payload_struct_thermal["ts"] = timestamp_rfc3339
payload_byte_array = create_pubsub_payload(
payload_struct_thermal, logger
)
filename = "{}{}-{}.bin".format(
args.raw_write_path, args.device_id, ts
)
with open(filename, "wb") as f:
f.write(payload_byte_array)
f.close()
if args.img_write or args.ml or protobuf:
if args.stream_delay and not continuous:
if printout:
logger.info(
"Stream_delay %sms before capturing frame",
args.stream_delay,
)
time.sleep(args.stream_delay / 1000.0)
if protobuf:
pil_img = None
if args.protocol == "genicam":
edge_camera.print_without_eol(
cam.get_frame(args, sensor_array, height, width)
)
else:
edge_camera.print_without_eol(cam.get_frame())
elif args.protocol == "genicam":
pil_img = cam.get_frame(args, sensor_array, height, width)
else:
pil_img = cam.get_frame()
if pil_img:
if crop:
pil_img = pil_img.crop(
(
args.crop_left,
args.crop_top,
args.crop_right,
args.crop_bottom,
)
)
if args.width > 0 and args.height > 0:
w, h = pil_img.size
if w != args.width or h != args.height:
pil_img = pil_img.resize((args.width, args.height))
if args.img_write:
filename = "{}{}-{}.png".format(
args.img_write_path, args.device_id, ts
)
if printout and not continuous:
logger.info("Writing image: %s", filename)
pil_img.save(filename)
if continuous and args.count > 0 and printout:
logger.info(
"Images written: %s/%s", count + 1, args.count
)
if args.ml:
byte_io = io.BytesIO()
pil_img.save(byte_io, "PNG")
byte_io.seek(0)
results = predict(
printout,
logger,
byte_io.read(),
ml_url,
request_body_template,
)
byte_io.seek(0)
if args.ml_write:
filename = "{}{}-{}.json".format(
args.ml_write_path, args.device_id, ts
)
logger.debug(
"Writing ML results JSON: %s", filename
)
with open(filename, "w", encoding="utf-8") as f:
f.write(json.dumps(results))
f.close()
else:
if printout:
logger.info(results)
if "results" in args.pubsub and args.ml:
payload_struct["ts"] = timestamp_rfc3339
payload_struct["file_id"] = ts
payload_struct["results"] = results
payload_byte_array = create_pubsub_payload(
payload_struct, logger
)
transmit_pubsub(
printout,
logger,
publisher,
args,
payload_byte_array,
)
if args.mqtt and args.ml:
transmit_mqtt(
printout,
logger,
mqtt_client,
results,
args.mqtt_topic_results,
)
time.sleep(args.sleep)
if buffer:
cam.queue(buffer)
if single:
loop = False
elif (continuous or batch) and total_count > 0:
count += 1
if count >= total_count:
loop = False
elif interactive:
key = input(
"Press Enter to process another image.. Q to quit: "
)
if "q" in key or "Q" in key:
loop = False
exit_gracefully(cam, mqtt_client)