in awstreamer/gst_pipeline/stream_client.py [0:0]
def schedule(self, config_or_filename=None, wait_for_finish=False, restart_on_exception=False):
'''
Start one or more pipelines asynchronously, in parallel
'''
try:
# Get config in the proper format
config = self.get_config(config_or_filename)
# Update cached config
self.update(config)
# Check for misuse of schedule()
for i in ['debug', 'enabled']:
if i in config and isinstance(config[i], bool):
logger.error("Either use start() instead of schedule() or nest the configuration into a pipeline.")
return config
# Stop pipelines that are going to be reconfigured
for key in self.futures:
if key in config:
logger.info("Cancelling the following pipeline: %s" % key)
if self.futures[key].cancel() == False:
logger.error("Failed to cancel the following pipeline: %s" % key)
continue
# Spin off each camera pipeline in a separate thread/process
for key in config:
# Cap at MAX_PIPELINE_COUNT
if len(self.futures.keys()) >= StreamClient.MAX_PIPELINE_COUNT:
logger.error("Maximum number of pipelines reached. Not configuring %s" % key)
continue
# Skip those sources that are disabled in configuration
if "enabled" in config[key] and not config[key]["enabled"]:
logger.info("Skipping %s (disabled)" % key)
del self.config[key]
continue
# Start new process/thread
future = self.pool.schedule(stream_pipeline, args=[key, config[key]])
if restart_on_exception:
future.add_done_callback(self.catch_and_restart)
self.futures[key] = future
# This is a blocking call, therefore use with caution (it will prevent parallel execution!)
if wait_for_finish:
for key in self.futures:
logger.info(self.futures[key].result())
except Exception as e:
logger.error("Failed to start pipeline(s): " + repr(e))
self.config["error"] = repr(e)
return self.config