sample_app/onvif_camera_mock/artifacts/main.py (74 lines of code) (raw):

#!/usr/bin/env python3 # Run privileged: `sudo /usr/bin/python3 rtsp-feed.py` import os import sys import gi import subprocess gi.require_version('Gst', '1.0') gi.require_version('GstRtspServer', '1.0') from gi.repository import Gst, GstRtspServer, GObject, GLib # Ask wsdd nicely to terminate. interface = os.environ.get('INTERFACE') if interface is None: print("No interface such as 'eth0' or 'eno1' provided") sys.exit(1) directory = os.environ.get('DIRECTORY') if directory is None: directory = "/onvif-camera-mock" print("No scripts directory provided. Using the directory the script was executed from: {}".format(directory)) else: print("Using provided directory. The script was executed from:: {}".format(directory)) firmware_ver = os.environ.get('FIRMWARE') if firmware_ver is None: print("No firmware version provided. Using default 1.0") firmware_ver = "1.0" else: print("Using provided firmware version: {}".format(firmware_ver)) if os.system("pgrep wsdd > /dev/null") == 0: print("Killing previous wssd instances") os.system("sudo pkill wsdd") # Forcibly terminate. if os.system("pgrep wsdd > /dev/null") == 0: os.system("sudo pkill -9 wsdd") # Ask onvif server nicely to terminate. if os.system("pgrep onvif_srvd > /dev/null") == 0: print("Killing previous onvif_srvd instance") os.system("sudo pkill onvif_srvd") # Forcibly terminate. if os.system("pgrep onvif_srvd > /dev/null") == 0: os.system("sudo pkill -9 onvif_srvd") # Ask onvif server nicely to terminate. if os.system("pgrep rtsp-feed.py > /dev/null") == 0: print("Killing rtsp-feed.py instance") os.system("sudo pkill rtsp-feed.py") # Forcibly terminate. if os.system("pgrep rtsp-feed.py > /dev/null") == 0: os.system("sudo pkill -9 rtsp-feed.py") ip4 = subprocess.check_output(["/sbin/ip", "-o", "-4", "addr", "list", interface]).decode().split()[3].split('/')[0] os.system("sudo {}/onvif_srvd/onvif_srvd --ifs {} --scope onvif://www.onvif.org/name/TestDev --scope onvif://www.onvif.org/Profile/S --name RTSP --width 800 --height 600 --url rtsp://{}:8554/stream1 --type MPEG4 --firmware_ver {}".format(directory, interface, ip4, firmware_ver)) os.system("{}/wsdd/wsdd --if_name {} --type tdn:NetworkVideoTransmitter --xaddr http://{}:1000/onvif/device_service --scope \"onvif://www.onvif.org/name/Unknown onvif://www.onvif.org/Profile/Streaming\"".format(directory, interface, ip4)) loop = GLib.MainLoop() Gst.init(None) class TestRtspMediaFactory(GstRtspServer.RTSPMediaFactory): def __init__(self): GstRtspServer.RTSPMediaFactory.__init__(self) def do_create_element(self, url): global mp4File # Defualt value to create color bar video if (mp4File == "4080751"): mock_pipeline = "videotestsrc pattern=bar horizontal-speed=2 background-color=9228238 foreground-color={0} ! x264enc ! queue ! rtph264pay name=pay0 config-interval=1 pt=96".format(mp4File) else: #set mp4 file path to filesrc's location property src_demux = "filesrc location={} ! qtdemux name=demux".format(mp4File) h264_transcode = "demux.video_0" mock_pipeline = "{0} {1} ! queue ! rtph264pay name=pay0 config-interval=1 pt=96".format(src_demux, h264_transcode) print ("Pipeling launching: {}".format(mock_pipeline)) return Gst.parse_launch(mock_pipeline) class GstreamerRtspServer(): def __init__(self): self.rtspServer = GstRtspServer.RTSPServer() factory = TestRtspMediaFactory() factory.set_shared(True) mountPoints = self.rtspServer.get_mount_points() mountPoints.add_factory("/stream1", factory) self.rtspServer.attach(None) # Optionally pass in video file for streaming, if not use default color bar video if __name__ == '__main__': global mp4File mp4File = os.environ.get('MP4FILE') if mp4File is None: mp4File = "4080751" print ("Default video bar color is 4080751") else: print ("Using provided video file: {}".format(str(mp4File))) s = GstreamerRtspServer() loop.run()