in projects/vision-ai-edge-platform/camera-client/edge_camera.py [0:0]
def scan(self):
"""Scans the LAN for ONVIF cameras and their RTSP streams.
Returns:
list: A list of dictionaries, where each dictionary represents a
discovered ONVIF camera. The camera dictionary contains the
keys 'make', 'model', and 'streams'. The 'streams' key contains
a list of RTSP addresses for the camera. Returns None if no
cameras are found.
"""
scanned = []
found = []
ttype = wsdiscovery.QName(
"http://www.onvif.org/ver10/network/wsdl", "NetworkVideoTransmitter"
)
scope = wsdiscovery.Scope("onvif://www.onvif.org/")
warnings.filterwarnings(
action="ignore", message="unclosed", category=ResourceWarning
)
if self.printout:
print("Discovering ONVIF cameras on the network...")
wsd = WSDiscovery()
wsd.start()
services = wsd.searchServices(types=[ttype], scopes=[scope])
for service in services:
uuid = service.getEPR().split(":")[2]
parsed = urllib.parse.urlparse(service.getXAddrs()[0])
addr, port = parsed.netloc.split(":")
scanned.append({"uuid": uuid, "addr": addr, "port": port})
wsd.stop()
if scanned:
wdsl_path = onvif.__path__[0].replace("onvif", "wsdl")
if self.printout:
print("ONVIF cameras found: {}".format(scanned))
print("Querying found ONVIF cameras for RTSP URIs..")
for scan in scanned:
try:
self.logger.debug(scan)
cam = {}
mycam = onvif.ONVIFCamera(
scan["addr"],
scan["port"],
self.user,
self.passwd,
wdsl_path,
)
device_info = mycam.devicemgmt.GetDeviceInformation()
self.logger.debug(device_info)
cam["make"] = device_info["Manufacturer"]
cam["model"] = device_info["Model"]
cam["streams"] = []
media_service = mycam.create_media_service()
profiles = media_service.GetProfiles()
for profile in profiles:
token = profile.token
stream_uri = media_service.GetStreamUri(
{
"StreamSetup": {
"Stream": "RTP-Unicast",
"Transport": "UDP",
},
"ProfileToken": token,
}
)
rtsp_uri = stream_uri["Uri"]
cam["streams"].append(rtsp_uri)
found.append(cam)
except onvif.ONVIFError as e:
if self.printout:
print("Error querying ONVIF camera: {}".format(e))
else:
if self.printout:
print("No ONVIF cameras discovered")
return None
self.logger.debug(found)
return found