def initializeFunctions()

in streampipes-client-python/streampipes/functions/function_handler.py [0:0]


    def initializeFunctions(self) -> None:
        """Creates the context for every data stream and starts the event loop to manage the StreamPipes Functions.

        Returns
        -------
        None
        """
        for streampipes_function in self.registration.getFunctions():
            # Create the output data streams for every function
            for stream_id, output_stream in streampipes_function.function_definition.get_output_data_streams().items():
                try:
                    self.client.dataStreamApi.post(output_stream)
                except HTTPError as e:
                    logger.info("The data stream could not be created.")
                    if e.response.status_code == HTTPStatus.BAD_REQUEST:
                        logger.info(
                            "This is due to the fact that this data stream already exists. "
                            "Continuing with the existing data stream."
                        )
                    else:
                        raise RuntimeError from e
                logger.info(
                    f"Using output data stream '{stream_id}' for function '{streampipes_function.getFunctionId().id}'"
                )
            # Choose the broker and collect the schema for every data stream
            for stream_id in streampipes_function.requiredStreamIds():
                # Get the data stream schema from the API
                data_stream: DataStream = self.client.dataStreamApi.get(stream_id)  # type: ignore
                # Get the broker
                broker: Consumer = get_broker(data_stream)  # type: ignore
                # Assign the functions, broker and schema to every stream
                if stream_id in self.stream_contexts.keys():
                    self.stream_contexts[stream_id].add_function(streampipes_function)
                else:
                    self.stream_contexts[stream_id] = DataStreamContext(
                        functions=[streampipes_function], schema=data_stream, broker=broker
                    )
                logger.info(f"Using {broker.__class__.__name__} for {streampipes_function.__class__.__name__}")

        # Start the function loop or add it as tasks if a loop is already running
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(self._function_loop())
        else:
            loop.create_task(self._function_loop())