in awstreamer/gst_pipeline/cv_pipeline.py [0:0]
def __init__(self, config):
logger.info("Initializing %s..." % self.__class__.__name__)
super().__init__(config)
# OpenCV source element
src = config["source"]
self.source = cv2.VideoCapture(src["name"])
self.src_type = src["type"] if "type" in src else "live"
self.min_idx = src["min_idx"] if "min_idx" in src else -1
self.max_idx = src["max_idx"] if "max_idx" in src else -1
self.step = src["step"] if "step" in src else 1
self.restart = src["restart"] if "restart" in src else False
self.rotate = src["rotate"] if "rotate" in src else False
# OpenCV sink element
self.sink_output = None
if config.isSet("sink.output"):
output = config.get("sink.output")
width = int(config.get("sink.width"))
height = int(config.get("sink.height"))
fps = int(config.get("sink.fps"))
if isinstance(output, str):
outstr = output.strip()
if ".mp4" in outstr:
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
self.sink_output = cv2.VideoWriter(outstr, fourcc, fps, (width, height))
elif isinstance(output, dict):
pipeline = config.get("sink.output.pipeline")
outstr = "appsrc"
for k,v in pipeline.items():
outstr += " ! " + v
logger.info(outstr)
self.sink_output = cv2.VideoWriter(outstr, cv2.CAP_GSTREAMER, 0, fps, (width, height))
backened = self.sink_output.getBackendName()
logger.info("Backend: %s" % backened)
if self.sink_output.isOpened():
logger.info("Successfully opened '%s' for writing" % outstr)
else:
logger.error("Couldn't open '%s' for writing" % outstr)
self.sink_output = None
# GStreamer sink pipeline
self.sink_pipeline = None
if config.isSet("sink.pipeline"):
client = awstreamer.client()
self.sink_pipeline, self.sink_thread = client.start(config.get("sink.pipeline"), wait_for_finish=False)
self.sink_width = self.sink_pipeline.config.get("source.width")
self.sink_height = self.sink_pipeline.config.get("source.height")
self.sink_fps = self.sink_pipeline.config.get("source.fps")
# OpenCV source display window
self.source_window_name = None
if config.get("source.display"):
if isinstance(config.get("source.display"), str):
self.source_window_name = config.get("source.display")
else:
self.source_window_name = "input"
# OpenCV sink display window
self.sink_window_name = None
if config.get("sink.display"):
if isinstance(config.get("sink.display"), str):
self.sink_window_name = config.get("sink.display")
else:
self.sink_window_name = "output"
# Overwrite process function
if config.isSet("process"):
self.process = config.get("process")