in streampipes-client-python/streampipes/functions/function_handler.py [0:0]
def initializeFunctions(self) -> None:
"""Creates the context for every data stream and starts the event loop to manage the StreamPipes Functions.
Returns
-------
None
"""
for streampipes_function in self.registration.getFunctions():
# Create the output data streams for every function
for stream_id, output_stream in streampipes_function.function_definition.get_output_data_streams().items():
try:
self.client.dataStreamApi.post(output_stream)
except HTTPError as e:
logger.info("The data stream could not be created.")
if e.response.status_code == HTTPStatus.BAD_REQUEST:
logger.info(
"This is due to the fact that this data stream already exists. "
"Continuing with the existing data stream."
)
else:
raise RuntimeError from e
logger.info(
f"Using output data stream '{stream_id}' for function '{streampipes_function.getFunctionId().id}'"
)
# Choose the broker and collect the schema for every data stream
for stream_id in streampipes_function.requiredStreamIds():
# Get the data stream schema from the API
data_stream: DataStream = self.client.dataStreamApi.get(stream_id) # type: ignore
# Get the broker
broker: Consumer = get_broker(data_stream) # type: ignore
# Assign the functions, broker and schema to every stream
if stream_id in self.stream_contexts.keys():
self.stream_contexts[stream_id].add_function(streampipes_function)
else:
self.stream_contexts[stream_id] = DataStreamContext(
functions=[streampipes_function], schema=data_stream, broker=broker
)
logger.info(f"Using {broker.__class__.__name__} for {streampipes_function.__class__.__name__}")
# Start the function loop or add it as tasks if a loop is already running
try:
loop = asyncio.get_running_loop()
except RuntimeError:
asyncio.run(self._function_loop())
else:
loop.create_task(self._function_loop())