in projects/vision-ai-edge-camera-client/edge_camera.py [0:0]
def __init__(self, address, device_id, logger, stdout, protobuf):
"""Initializes the instance with USB device address.
Args:
address: str, USB device address.
device_id: str, Unique identifier for the camera instance.
logger: logging.Logger, Logger object for recording messages.
stdout: str, Output mode for messages ('print' for stdout, 'protobuf'
for protobuf messages).
protobuf: bool, Whether to use protobuf for output messages.
"""
self.address = address
self.device_id = device_id
self.logger = logger
self.stdout = stdout
self.printout = self.stdout == "print"
self.protobuf = protobuf
self.cameras_list = []
self.capture = None
self.cam_proto = cameras_pb2.Camera()
self.health_proto = cameras_pb2.CameraHealthCheckResult()
self.frame_proto = cameras_pb2.CameraFrameResult()
self.cameras_list = self.scan()
self.discovery_proto = self._discovery_results(self.cameras_list)
if self.printout:
print("USB cameras found:")
for cam in self.cameras_list:
print(
"Address: {} | Model: {} | # of Properties: {}".format(
cam["address"], cam["model"], len(cam["properties"])
)
)
elif "protobuf" in self.stdout and not self.address:
self.logger.debug("Printing scan results in protobuf")
print_without_eol(
self.discovery_proto.SerializeToString().decode("utf-8")
)
if self.address:
stream_proto = cameras_pb2.Camera.Stream()
stream_proto.protocol = cameras_pb2.Camera.Stream.PROTOCOL_USB
stream_proto.address = self.address
self.cam_proto.streams.append(stream_proto)
for cam in self.cameras_list:
if self.address == cam["address"]:
self.cam_proto.make = cam["make"]
self.cam_proto.model = cam["model"]
self.logger.debug(
"Opening capture connection to: {}".format(self.address)
)
self.capture = cv2.VideoCapture(self.address)
self.logger.debug(self.cam_proto)