in camera-sdk/iotccsdk/frame_iterators.py [0:0]
def start(self, result_src):
"""
This is the inference generator method
It gets inferences from the RTSP VA stream from the camera.
Parameters
----------
result_src : str
VA RTSP stream url.
Yields
------
CameraInference
An object of `CameraInference` type.
Which has timestamp and list of `CameraInferenceObject` objects.
Raises
------
Exception
Any exception that occurs during inference handling.
"""
cmd = ['gst-launch-1.0 ',
' -q ',
' rtspsrc ',
' location=%s' % result_src,
' protocols=tcp ',
' ! ',
' application/x-rtp, media=application ',
' ! ',
' fakesink ',
' dump=true']
cmd = ''.join(cmd)
self.logger.info('result_src: %s' % result_src)
self.logger.info('gstreamer cmd: %s' % str(cmd))
platform = sys.platform
platform = platform.lower()
self.logger.info('Platform: %s' % platform)
if 'win' in platform:
data_idx = 78
elif 'linux' in platform:
data_idx = 72
try:
self._sub_proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1,
universal_newlines=True)
for line in self._sub_proc.stdout:
if 'ERROR' in line or 'error' in line:
raise Exception(line)
l_str = line[data_idx:]
l_str = l_str.strip(os.linesep)
self.logger.debug(l_str)
if ":[" in self._json_str and "] }" in self._json_str + l_str:
# Only yield if objects are present in the inferences
self._json_str = self._json_str + l_str
s_idx = self._json_str.index('{ "')
e_idx = self._json_str.index("] }") + 3
self._json_str = self._json_str[s_idx:e_idx]
self.logger.debug(self._json_str)
result = self._get_inference_result()
self._json_str = ""
yield result
elif (":[" not in self._json_str
and '{ "' in self._json_str
and " }" in self._json_str + l_str):
self._json_str = ""
else:
self._json_str = self._json_str + l_str
except (Exception, subprocess.CalledProcessError) as e:
self.logger.exception(e)
raise