projects/vision-ai-edge-camera-client/edge_camera.py (759 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=W1405 # ruff: noqa """Edge camera helper functions. This module provides a set of camera classes for interacting with different types of cameras, including: - **FileCamera:** Reads image files as a camera source. - **RtspCamera:** Connects to RTSP streams using OpenCV. - **OnvifCamera:** Extends RTSP with ONVIF support for discovery and configuration. - **UsbCamera:** Connects to USB cameras using OpenCV. - **GenicamCamera:** Connects to Genicam/GigE Vision cameras using the harvesters library. Each camera class implements common methods for: - **Opening and closing connections:** `open()`, `close()` - **Acquiring frames:** `get_frame()` - **Health checks:** `health_check()` - **Configuration management (for configurable cameras):** `get_property()`, `set_property()`, `get_properties()`, `set_properties()`, `scan()` This module also provides helper functions for: - **Printing without end-of-line:** `print_without_eol()` - **Creating protobuf messages:** `cameras_pb2` This module is designed to be used in conjunction with the `camera_client.py` script, which provides a command-line interface for interacting with cameras and running ML inference. """ import abc import io import os import threading import urllib import warnings import cameras_pb2 import cv2 from harvesters.core import Harvester import numpy as np import onvif import PIL from PIL import Image import wsdiscovery from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery warnings.filterwarnings("ignore", category=DeprecationWarning) def print_without_eol(mystr): """Prints a string without an end-of-line character. Args: mystr: The string to print. """ print(mystr, end="") class BasicCamera(abc.ABC): """Abstract basic camera definition. Abstract definition of a generic camera class. No implementations. Attributes: address: str device_id: str """ address: str = NotImplemented device_id: str = NotImplemented @abc.abstractmethod def open(self): """Opens the camera connection.""" raise NotImplementedError() @abc.abstractmethod def close(self): """Closes the camera connection.""" raise NotImplementedError() @abc.abstractmethod def get_frame(self): """Gets a frame from the camera.""" raise NotImplementedError() @abc.abstractmethod def health_check(self): """Checks if the camera is working normally.""" raise NotImplementedError() class ConfigurableCamera(BasicCamera): """Abstract configurable camera definition. Abstract definition of a configurable camera class. Similar to BasicCamera, with additional abstract methods, for camera config control and scanning. """ @abc.abstractmethod def get_property(self): """Gets a configuration parameter's value from the camera.""" raise NotImplementedError() @abc.abstractmethod def set_property(self): """Sets a configuration parameter's value on the camera.""" raise NotImplementedError() @abc.abstractmethod def get_properties(self): """Gets all configurations from the camera.""" raise NotImplementedError() @abc.abstractmethod def set_properties(self): """Sets all configurations on the camera.""" raise NotImplementedError() @abc.abstractmethod def scan(self): """Scans for connected cameras.""" raise NotImplementedError() class FileCamera(BasicCamera): """File type camera class. Implements a camera class based on reading input files. Input files are valid image files that can be used as a camera, when the actual camera cannot be integrated with. Attributes: address: str device_id: str logger: logging.Logger stdout: str protobuf: bool cam_proto: cameras_pb2.Camera health_proto: cameras_pb2.CameraHealthCheckResult frame_proto: cameras_pb2.CameraFrameResult printout: bool image: PIL.Image """ def __init__(self, address, device_id, logger, stdout, protobuf): """Initializes the instance based on input image file path. Args: address: str device_id: str logger: logging.Logger stdout: str protobuf: bool """ self.address = address self.device_id = device_id self.logger = logger self.stdout = stdout self.protobuf = protobuf self.cam_proto = cameras_pb2.Camera() self.health_proto = cameras_pb2.CameraHealthCheckResult() self.frame_proto = cameras_pb2.CameraFrameResult() self.printout = self.stdout == "print" self.image = None if self.printout: print("Processing image file: {}".format(self.address)) if self.address: stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_FILE stream_proto.address = self.address self.cam_proto.streams.append(stream_proto) self.cam_proto.make = "File" self.cam_proto.model = "System" self.logger.debug( "Opening capture connection to: {}".format(self.address) ) self.logger.debug(self.cam_proto) def open(self): """Opens the source image as a connected camera.""" if self.image: self.close() try: if self.printout: self.logger.debug("Opening image: {}".format(self.address)) self.image = Image.open(self.address, mode="r") except FileNotFoundError as e1: if self.printout: print(f"Error: {e1!r}") self.logger.error("Cannot find image as camera") exit(1) except PIL.UnidentifiedImageError as e2: if self.printout: print(e2) self.logger.error("Cannot open image as camera") exit(1) except IOError as e3: if self.printout: print(e3) self.logger.error("Cannot open image as camera") exit(1) def close(self): """Closes the source image file.""" if self.image: if self.printout: self.logger.debug("Closing image: {}".format(self.address)) self.image.close() self.image = None def get_frame(self): """Reads the source image as a frame. Returns: PIL.Image: image frame self.frame_proto: protobuf frame """ try: if self.printout: self.logger.debug("Reading image: {}".format(self.address)) self.image.load() except IOError as e1: if self.printout: print(f"Error: {e1!r}") return None except MemoryError as e2: if self.printout: print(e2) return None if not self.protobuf: return self.image buf = io.BytesIO() self.image.save(buf, format="PNG") buf.seek(0) self.frame_proto.png_frame = buf.read() self.frame_proto.camera.CopyFrom(self.cam_proto) return self.frame_proto.SerializeToString().decode("utf-8") def health_check(self): """Checks if the image can be read. Returns: bool: True if the image can be read, False otherwise. str: Protobuf string if protobuf is True, None otherwise. """ image = self.get_frame() if not image: if self.printout: print("Error reading frame.") return False if self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) self.health_proto.check_result = False return self.health_proto.SerializeToString().decode("utf-8") return False if self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) self.health_proto.check_result = bool(image) return self.health_proto.SerializeToString().decode("utf-8") else: return bool(image) def set_address(self, new_address): """Sets a new address (file path) for the camera. Args: new_address: The new file path to use as the camera source. Returns: bool: True if the address is set successfully, False otherwise. """ self.close() self.image = None self.address = new_address self.open() class RtspCamera(BasicCamera): """RTSP type camera class. Implements a camera class based on RTSP streaming. Uses OpenCV for interacting with the camera. Attributes: address: str user: str passwd: str device_id: str logger: logging.Logger stdout: str protobuf: bool cameras_list: list image: PIL.Image event: threading.Event cam_proto: cameras_pb2.Camera health_proto: cameras_pb2.CameraHealthCheckResult frame_proto: cameras_pb2.CameraFrameResult printout: bool capture: cv2.VideoCapture thread: threading.Thread last_return_code: bool last_frame: np.ndarray lock: threading.Lock """ last_return_code = None last_frame = None lock = threading.Lock() def capture_frames(self, event, capture): """Consumes RTSP framebuffer so get_frame gets the latest frame. This method runs in a separate thread and continuously reads frames from the RTSP stream using the provided `cv2.VideoCapture` object. This ensures that the `get_frame` method always retrieves the most recent frame available. Args: event: A `threading.Event` object used to signal the thread to stop. capture: A `cv2.VideoCapture` object for the open RTSP stream. """ while True: if event.is_set(): break try: if capture.isOpened(): with self.lock: self.last_return_code, self.last_frame = capture.read() self.logger.debug("Read frame") except cv2.error: pass def __init__( self, address, user, passwd, device_id, logger, stdout, protobuf ): """Initializes with RTSP address and optional authentication. Args: address: RTSP address of the camera. user: Username for camera authentication (optional). passwd: Password for camera authentication (optional). device_id: Unique identifier for the camera instance. logger: Logger object for recording messages. stdout: Output mode for messages ('print' for stdout, 'protobuf' for protobuf messages). protobuf: Whether to use protobuf for output messages. """ self.address = address self.user = user self.passwd = passwd self.device_id = device_id self.logger = logger self.stdout = stdout self.printout = self.stdout == "print" self.protobuf = protobuf self.cameras_list = [] self.image = None self.event = threading.Event() self.cam_proto = cameras_pb2.Camera() self.health_proto = cameras_pb2.CameraHealthCheckResult() self.frame_proto = cameras_pb2.CameraFrameResult() if self.address: stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_RTSP stream_proto.address = self.address self.cam_proto.streams.append(stream_proto) if user: if "@" not in address: self.address = ( f"rtsp://{self.user}:{self.passwd}@" f"{self.address.split('rtsp://')[1]}" ) self.logger.debug(self.cam_proto) if self.printout: print("Using RTSP camera: {}".format(self.address)) try: self.capture = cv2.VideoCapture(self.address) self.thread = threading.Thread( target=self.capture_frames, args=(self.event, self.capture) ) self.thread.start() except cv2.error as e: if self.printout: print(e) def open(self): """Checks if the OpenCV VideoCapture connection is open.""" self.logger.debug("Opening RTSP camera connection") if not self.capture.isOpened(): if self.printout: print("Cannot open camera") self.close() exit(1) def close(self): """Closes OpenCV VideoCapture.""" self.logger.debug("Closing RTSP camera connection") if self.address: self.event.set() self.thread.join() if self.capture: self.capture.release() def get_frame(self): """Gets the latest frame from RTSP stream. Returns: PIL.Image: The latest frame from the RTSP stream as a PIL Image, or None if no frame is available. str: Protobuf string if protobuf is True, None otherwise. """ self.logger.debug( "RTSP camera get_frame return code: {} data type: {}".format( self.last_return_code, type(self.last_frame) ) ) if not self.last_return_code: self.logger.debug("No frame received, looping") while not self.last_return_code: continue pil_image = Image.fromarray( cv2.cvtColor(self.last_frame, cv2.COLOR_BGR2RGB) ) if not self.protobuf: return pil_image buf = io.BytesIO() pil_image.save(buf, format="PNG") buf.seek(0) self.frame_proto.png_frame = buf.read() self.frame_proto.camera.CopyFrom(self.cam_proto) return self.frame_proto.SerializeToString().decode("utf-8") def health_check(self): """Checks if we can get valid frames from RTSP stream. Returns: bool: True if a valid frame can be retrieved, False otherwise. str: Protobuf string with health check result if protobuf is True, None otherwise. """ self.logger.debug("RTSP camera health_check - trying to get an image") image = self.get_frame() if not image: if self.printout: print("Error reading frame from RTSP stream.") return False elif self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) self.health_proto.check_result = False return self.health_proto.SerializeToString().decode("utf-8") else: return False if self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) if not image: self.health_proto.check_result = False else: self.health_proto.check_result = True return self.health_proto.SerializeToString().decode("utf-8") else: if not image: return False else: return True class OnvifCamera(RtspCamera): """ONVIF type RTSP camera class. Implements a camera class based on RTSP streaming, with additional ONVIF support on the camera. ONVIF adds configuration control and discovery of cameras, as well as their RTSP streams, for more advanced RTSP cameras. Attributes: discovery_proto: CameraDiscoveryResult proto """ def _discovery_results(self, cameras_list): """Returns a cameras_pb2.CameraDiscoveryResult proto of addresses.""" self.logger.debug( "ONVIF Camera _discovery_results. Cameras_list: {}".format( cameras_list ) ) discovery_proto = cameras_pb2.CameraDiscoveryResult() for cam in cameras_list: camera_proto = cameras_pb2.Camera() camera_proto.make = cam["make"] camera_proto.model = cam["model"] for stream in cam["streams"]: stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_ONVIF stream_proto.address = stream camera_proto.streams.append(stream_proto) discovery_proto.cameras.append(camera_proto) return discovery_proto def __init__( self, address, user, passwd, device_id, logger, stdout, protobuf ): """Initializes the instance with base RTSP init, plus ONVIF scan. Args: address: RTSP address of the camera. If None, only ONVIF scans. user: Username for camera authentication (optional). passwd: Password for camera authentication (optional). device_id: Unique identifier for the camera instance. logger: Logger object for recording messages. stdout: Output mode for messages ('print' for stdout, 'protobuf' for protobuf messages). protobuf: Whether to use protobuf for output messages. """ super().__init__( address, user, passwd, device_id, logger, stdout, protobuf ) self.cameras_list = self.scan() if self.cameras_list: self.discovery_proto = self._discovery_results(self.cameras_list) else: self.discovery_proto = None if self.address: for cam in self.cameras_list: if self.address in cam["streams"]: self.cam_proto.make = cam["make"] self.cam_proto.model = cam["model"] self.logger.debug(self.cam_proto) if self.printout and self.cameras_list: print("ONVIF cameras found:") for cam in self.cameras_list: print( "Make: {} | Model: {} | Addresses: {}".format( cam["make"], cam["model"], cam["streams"] ) ) elif self.stdout == "protobuf" and not self.address: self.logger.debug("Printing scan results in protobuf") print_without_eol( self.discovery_proto.SerializeToString().decode("utf-8") ) def scan(self): """Scans the LAN for ONVIF cameras and their RTSP streams. Returns: list: A list of dictionaries, where each dictionary represents a discovered ONVIF camera. The camera dictionary contains the keys 'make', 'model', and 'streams'. The 'streams' key contains a list of RTSP addresses for the camera. Returns None if no cameras are found. """ scanned = [] found = [] ttype = wsdiscovery.QName( "http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter" ) scope = wsdiscovery.Scope("onvif://www.onvif.org/") warnings.filterwarnings( action="ignore", message="unclosed", category=ResourceWarning ) if self.printout: print("Discovering ONVIF cameras on the network...") wsd = WSDiscovery() wsd.start() services = wsd.searchServices(types=[ttype], scopes=[scope]) for service in services: uuid = service.getEPR().split(":")[2] parsed = urllib.parse.urlparse(service.getXAddrs()[0]) addr, port = parsed.netloc.split(":") scanned.append({"uuid": uuid, "addr": addr, "port": port}) wsd.stop() if scanned: wdsl_path = onvif.__path__[0].replace("onvif", "wsdl") if self.printout: print("ONVIF cameras found: {}".format(scanned)) print("Querying found ONVIF cameras for RTSP URIs..") for scan in scanned: try: self.logger.debug(scan) cam = {} mycam = onvif.ONVIFCamera( scan["addr"], scan["port"], self.user, self.passwd, wdsl_path, ) device_info = mycam.devicemgmt.GetDeviceInformation() self.logger.debug(device_info) cam["make"] = device_info["Manufacturer"] cam["model"] = device_info["Model"] cam["streams"] = [] media_service = mycam.create_media_service() profiles = media_service.GetProfiles() for profile in profiles: token = profile.token stream_uri = media_service.GetStreamUri( { "StreamSetup": { "Stream": "RTP-Unicast", "Transport": "UDP", }, "ProfileToken": token, } ) rtsp_uri = stream_uri["Uri"] cam["streams"].append(rtsp_uri) found.append(cam) except onvif.ONVIFError as e: if self.printout: print("Error querying ONVIF camera: {}".format(e)) else: if self.printout: print("No ONVIF cameras discovered") return None self.logger.debug(found) return found class UsbCamera(ConfigurableCamera): """USB type camera class. Implements a camera class based on USB protocol. USB cameras are directly connected to the host USB bus. Uses OpenCV for interacting with the camera. Attributes: address: str. device_id: str. logger: logging.Logger. stdout: str. protobuf: bool. cameras_list: list. capture: cv2.VideoCapture. cam_proto: cameras_pb2.Camera. health_proto: cameras_pb2.CameraHealthCheckResult. frame_proto: cameras_pb2.CameraFrameResult. printout: bool. discovery_proto: cameras_pb2.CameraDiscoveryResult. """ def _discovery_results(self, cameras_list): """Returns a cameras_pb2.CameraDiscoveryResult proto of addresses. Args: cameras_list: A list of dicts, where each dict represents a discovered ONVIF camera. The camera dict contains the keys 'make', 'model', and 'streams'. The 'streams' key contains a list of RTSP addresses for the camera. Returns: cameras_pb2.CameraDiscoveryResult: Protobuf message containing information about discovered cameras and their streams. """ self.logger.debug( "USB Camera _discovery_results. Cameras_list: {}".format( cameras_list ) ) discovery_proto = cameras_pb2.CameraDiscoveryResult() for cam in cameras_list: camera_proto = cameras_pb2.Camera() camera_proto.make = cam["make"] camera_proto.model = cam["model"] stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_USB stream_proto.address = cam["address"] camera_proto.streams.append(stream_proto) discovery_proto.cameras.append(camera_proto) return discovery_proto def __init__(self, address, device_id, logger, stdout, protobuf): """Initializes the instance with USB device address. Args: address: str, USB device address. device_id: str, Unique identifier for the camera instance. logger: logging.Logger, Logger object for recording messages. stdout: str, Output mode for messages ('print' for stdout, 'protobuf' for protobuf messages). protobuf: bool, Whether to use protobuf for output messages. """ self.address = address self.device_id = device_id self.logger = logger self.stdout = stdout self.printout = self.stdout == "print" self.protobuf = protobuf self.cameras_list = [] self.capture = None self.cam_proto = cameras_pb2.Camera() self.health_proto = cameras_pb2.CameraHealthCheckResult() self.frame_proto = cameras_pb2.CameraFrameResult() self.cameras_list = self.scan() self.discovery_proto = self._discovery_results(self.cameras_list) if self.printout: print("USB cameras found:") for cam in self.cameras_list: print( "Address: {} | Model: {} | # of Properties: {}".format( cam["address"], cam["model"], len(cam["properties"]) ) ) elif "protobuf" in self.stdout and not self.address: self.logger.debug("Printing scan results in protobuf") print_without_eol( self.discovery_proto.SerializeToString().decode("utf-8") ) if self.address: stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_USB stream_proto.address = self.address self.cam_proto.streams.append(stream_proto) for cam in self.cameras_list: if self.address == cam["address"]: self.cam_proto.make = cam["make"] self.cam_proto.model = cam["model"] self.logger.debug( "Opening capture connection to: {}".format(self.address) ) self.capture = cv2.VideoCapture(self.address) self.logger.debug(self.cam_proto) def open(self): """Checks if VideoCapture is open to the camera.""" if not self.capture.isOpened(): if self.printout: self.logger.error("Cannot open camera") exit(1) def close(self): """Closes VideoCapture.""" if self.capture: self.capture.release() def get_frame(self): """Gets a frame from the camera.""" return_value, frame = self.capture.read() if not return_value: return None pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) if not self.protobuf: return pil_image buf = io.BytesIO() pil_image.save(buf, format="PNG") buf.seek(0) self.frame_proto.png_frame = buf.read() self.frame_proto.camera.CopyFrom(self.cam_proto) return self.frame_proto.SerializeToString().decode("utf-8") def get_property(self, key): """Gets a configuration parameter's value from the camera. Args: key: str, The name of the configuration parameter to retrieve. Returns: The value of the configuration parameter, or None if an error occurs. """ try: value = self.capture.get(getattr(cv2, key)) except cv2.error: return None else: return value def set_property(self, key, value): """Sets a configuration parameter's value on the camera.""" try: self.capture.set(getattr(cv2, key), value) except cv2.error: if self.printout: print( "Failed to write camera configuration parameter: {}".format( key ) ) def get_properties(self): """Gets all configurations from the camera. Returns: dict: A dictionary containing the camera's configuration parameters and their values. """ props = {} all_props = dir(cv2) for prop in all_props: if "CAP_PROP" in prop: props[prop] = self.get_property(prop) return props def set_properties(self, configs): """Sets all configurations from the camera. Args: configs: dict, A dictionary containing the camera's configuration parameters and their desired values. """ for key in configs: if self.printout: print( "Writing config to the camera: {} = {}".format( key, configs[key] ) ) self.set_property(key, configs[key]) def scan(self): """Scans the local server USB bus for connected cameras. Returns: list: A list of dictionaries, where each dictionary represents a discovered USB camera. The camera dictionary contains the keys 'address', 'make', 'model', and 'properties'. Returns None if no cameras are found. """ found = [] warnings.filterwarnings(action="ignore") if self.printout: print("Discovering USB cameras...") devs = os.listdir("/dev/") for dev in devs: if "video" in dev: cam_name_path = "/sys/class/video4linux/" + dev + "/name" dev = "/dev/" + dev capture = cv2.VideoCapture(dev) if capture.isOpened(): cam = {} cam["address"] = dev props = self.get_properties() if props: cam["properties"] = props try: f = open(cam_name_path, "r", encoding="utf-8") except cv2.error as e: self.logger.debug( "Cannot open file %s for reading: %s", cam_name_path, e, ) name = f.read() name = name.strip() cam["make"] = name cam["model"] = name found.append(cam) capture.release() f.close() self.logger.debug("cameras_list: {}".format(found)) return found def health_check(self): """Checks if we can get a valid frame from the camera.""" image = self.get_frame() if not image: if self.printout: print("Error reading frame from camera.") return False elif self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) self.health_proto.check_result = False return self.health_proto.SerializeToString().decode("utf-8") else: return False if self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) if not image: self.health_proto.check_result = False else: self.health_proto.check_result = True return self.health_proto.SerializeToString().decode("utf-8") else: if not image: return False else: return True class GenicamCamera(ConfigurableCamera): """Genicam type camera class. Implements a camera class based on Genicam/GigE Vision protocol. Genicam cameras support discovery and config control. Implemented using harvesters Genicam module. Attributes: address: str. device_id: str. gentl: str. logger: logging.Logger. stdout: str. protobuf: bool. cameras_list: list. img_acquirer: harvesters.core.ImageAcquirer. cam_proto: cameras_pb2.Camera. health_proto: cameras_pb2.CameraHealthCheckResult. frame_proto: cameras_pb2.CameraFrameResult. printout: bool. harvester: harvesters.core.Harvester. discovery_proto: cameras_pb2.CameraDiscoveryResult. """ def _discovery_results(self, cameras_list): """Returns a cameras_pb2.CameraDiscoveryResult proto of addresses. Returns: cameras_pb2.CameraDiscoveryResult: Protobuf message containing information about discovered cameras and their addresses. """ self.logger.debug( "Genicam Camera _discovery_results. Cameras_list: {}".format( cameras_list ) ) discovery_proto = cameras_pb2.CameraDiscoveryResult() for cam in cameras_list: camera_proto = cameras_pb2.Camera() camera_proto.make = cam["make"] camera_proto.model = cam["model"] stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_GENICAM stream_proto.address = cam["address"] camera_proto.streams.append(stream_proto) discovery_proto.cameras.append(camera_proto) return discovery_proto def __init__(self, address, device_id, gentl, logger, stdout, protobuf): """Initializes the instance with Genicam address, from 0 onwards. Args: address: str, Genicam device address, from 0 onwards. device_id: str, Unique identifier for the camera instance. gentl: str, Path to the GenTL producer file. logger: logging.Logger, Logger object for recording messages. stdout: str, Output mode for messages ('print' for stdout, 'protobuf' for protobuf messages). protobuf: bool, Whether to use protobuf for output messages. """ self.address = address self.device_id = device_id self.gentl = gentl self.logger = logger self.stdout = stdout self.printout = self.stdout == "print" self.protobuf = protobuf self.cameras_list = [] self.img_acquirer = None self.cam_proto = cameras_pb2.Camera() self.health_proto = cameras_pb2.CameraHealthCheckResult() self.frame_proto = cameras_pb2.CameraFrameResult() self.harvester = Harvester(logger=logger) self.harvester.add_file(gentl) self.harvester.timeout_for_update = 5000 self.cameras_list = self.scan() self.discovery_proto = self._discovery_results(self.cameras_list) if self.printout: print("Genicam cameras found:") for cam in self.cameras_list: print( "Address: {} | Make: {} | Model: {}".format( cam["address"], cam["make"], cam["model"] ) ) elif "protobuf" in self.stdout: self.logger.debug("Printing scan results in protobuf") print_without_eol( self.discovery_proto.SerializeToString().decode("utf-8") ) if self.address: stream_proto = cameras_pb2.Camera.Stream() stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_GENICAM stream_proto.address = self.address self.cam_proto.streams.append(stream_proto) for cam in self.cameras_list: if self.address == cam["address"]: self.cam_proto.make = cam["make"] self.cam_proto.model = cam["model"] self.logger.debug( "Opening capture connection to: {}".format(self.address) ) self.img_acquirer = self.harvester.create( search_key=int(self.address) ) self.img_acquirer.start(run_as_thread=True) self.logger.debug(self.cam_proto) def open(self): """Checks if the harvesters Image Acquirer is working normally.""" if self.printout: print( "Image Acquirer status: valid:{} armed:{} acquiring:{}".format( self.img_acquirer.is_valid(), self.img_acquirer.is_armed(), self.img_acquirer.is_acquiring(), ) ) def close(self): """Closes the harvesters Image Acquirer.""" if self.img_acquirer: self.img_acquirer.stop() self.harvester.reset() def get_frame(self, args, one_d, height, width): """Gets a sensor array dump from the camera and pre-processes. Args: args: program arguments. one_d: one dimensional array. height: image height. width: image width. Returns: PIL.Image: pre-processed image frame. self.frame_proto: protobuf frame. """ if not height: one_d, height, width, _ = self.get_raw() if not height: if self.printout: print("Genicam image acquiry failed") return None two_d = one_d.reshape(height, width) if args.range_max == 0: two_d_bits = np.interp(two_d, (two_d.min(), two_d.max()), (0, 254)) else: two_d_bits = np.interp( two_d, (args.range_min, args.range_max), (0, 254) ) if args.mode != "continuous" and self.printout: print( "Mapping from raw to 8bit: min: {}->{}, max: {}->{}".format( two_d.min(), two_d_bits.min(), two_d.max(), two_d_bits.max() ) ) pil_image = Image.fromarray( two_d_bits.astype(np.uint8), mode="L" ).convert("RGB") if not self.protobuf: return pil_image buf = io.BytesIO() pil_image.save(buf, format="PNG") buf.seek(0) self.frame_proto.png_frame = buf.read() self.frame_proto.camera.CopyFrom(self.cam_proto) return self.frame_proto.SerializeToString().decode("utf-8") def get_property(self, n): """Gets a configuration parameter's value from the camera. Args: n: str, The name of the configuration parameter to retrieve. Returns: The value of the configuration parameter, or None if an error occurs. """ try: value = getattr(self.img_acquirer.remote_device.node_map, n).value except AttributeError: return None except TypeError: return None return value def set_property(self, key, value): """Sets a configuration parameter's value to the camera.""" try: setattr(self.img_acquirer.remote_device.node_map, key, value) except AttributeError: if self.printout: print( "Failed to write camera configuration parameter: {}".format( key ) ) except TypeError: if self.printout: print( "Failed to write camera configuration parameter: {}".format( key ) ) except ValueError: if self.printout: print( "Failed to write camera configuration parameter: {}".format( key ) ) def get_properties(self): """Gets all configurations from the camera. Returns: dict: A dictionary containing the camera's configuration parameters and their values. """ props = {} node_map = dir(self.img_acquirer.remote_device.node_map) for node in node_map: if "__" not in node: value = self.get_property(node) if value: if isinstance(value, str): props[node] = '"' + value + '"' else: props[node] = value return props def set_properties(self, configs): """Sets all configurations on the camera. Args: configs: dict, A dictionary containing the camera's configuration parameters and their desired values. """ for key in configs: if self.printout: print( "Writing config to the camera: {} = {}".format( key, configs[key] ) ) self.set_property(key, configs[key]) def scan(self): """Scans the LAN for Genicam/GigE Vision cameras.""" found = [] if self.printout: print("Discovering Genicam cameras on the network...") self.harvester.update() for i in range(len(self.harvester.device_info_list)): device = self.harvester.device_info_list[i] device_dict = device.property_dict cam = {} cam["address"] = str(i) cam["properties"] = {} cam["make"] = device_dict["vendor"] cam["model"] = device_dict["model"] found.append(cam) self.logger.debug("cameras_list: {}".format(found)) return found def get_raw(self): """Gets a raw sensor array dump from the e.g Thermal camera. Returns: tuple: A tuple containing the raw sensor data, height, width, and buffer object. Returns None if no data is available. """ buffer = self.img_acquirer.try_fetch(timeout=3) if buffer: component = buffer.payload.components[0] return (component.data, component.height, component.width, buffer) else: return def queue(self, buffer): """Queues the Genicam frames receive buffer.""" buffer.queue() def health_check(self): """Checks if we can get data from the camera. Returns: bool: True if data can be retrieved, False otherwise. str: Protobuf containing health check result if protobuf is True, None otherwise. """ _, harvester, _, _ = self.get_raw() if not harvester: if self.printout: print("Error reading data from camera.") return False elif self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) self.health_proto.check_result = False return self.health_proto.SerializeToString().decode("utf-8") else: return False if self.protobuf: self.health_proto.camera.CopyFrom(self.cam_proto) if not harvester: self.health_proto.check_result = False else: self.health_proto.check_result = True return self.health_proto.SerializeToString().decode("utf-8") else: if not harvester: return False else: return True