def scan()

in projects/vision-ai-edge-platform/camera-client/edge_camera.py [0:0]


    def scan(self):
        """Scans the LAN for ONVIF cameras and their RTSP streams.

        Returns:
          list: A list of dictionaries, where each dictionary represents a
            discovered ONVIF camera. The camera dictionary contains the
            keys 'make', 'model', and 'streams'. The 'streams' key contains
            a list of RTSP addresses for the camera. Returns None if no
            cameras are found.
        """
        scanned = []
        found = []

        ttype = wsdiscovery.QName(
            "http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter"
        )
        scope = wsdiscovery.Scope("onvif://www.onvif.org/")

        warnings.filterwarnings(
            action="ignore", message="unclosed", category=ResourceWarning
        )
        if self.printout:
            print("Discovering ONVIF cameras on the network...")
        wsd = WSDiscovery()
        wsd.start()
        services = wsd.searchServices(types=[ttype], scopes=[scope])
        for service in services:
            uuid = service.getEPR().split(":")[2]
            parsed = urllib.parse.urlparse(service.getXAddrs()[0])
            addr, port = parsed.netloc.split(":")
            scanned.append({"uuid": uuid, "addr": addr, "port": port})
        wsd.stop()
        if scanned:
            wdsl_path = onvif.__path__[0].replace("onvif", "wsdl")
            if self.printout:
                print("ONVIF cameras found: {}".format(scanned))
                print("Querying found ONVIF cameras for RTSP URIs..")
            for scan in scanned:
                try:
                    self.logger.debug(scan)
                    cam = {}
                    mycam = onvif.ONVIFCamera(
                        scan["addr"],
                        scan["port"],
                        self.user,
                        self.passwd,
                        wdsl_path,
                    )
                    device_info = mycam.devicemgmt.GetDeviceInformation()
                    self.logger.debug(device_info)
                    cam["make"] = device_info["Manufacturer"]
                    cam["model"] = device_info["Model"]
                    cam["streams"] = []
                    media_service = mycam.create_media_service()
                    profiles = media_service.GetProfiles()
                    for profile in profiles:
                        token = profile.token
                        stream_uri = media_service.GetStreamUri(
                            {
                                "StreamSetup": {
                                    "Stream": "RTP-Unicast",
                                    "Transport": "UDP",
                                },
                                "ProfileToken": token,
                            }
                        )
                        rtsp_uri = stream_uri["Uri"]
                        cam["streams"].append(rtsp_uri)
                    found.append(cam)
                except onvif.ONVIFError as e:
                    if self.printout:
                        print("Error querying ONVIF camera: {}".format(e))
        else:
            if self.printout:
                print("No ONVIF cameras discovered")
            return None
        self.logger.debug(found)
        return found