def process_message_pipeline()

in main.py [0:0]


def process_message_pipeline(logger,
                             config,
                             data,
                             event,
                             context,
                             using_webserver=False):
    template_variables = {
        'data': data,
        'event': event,
        'context': context,
        'using_webserver': using_webserver
    }
    if len(config['pipeline']) == 0:
        raise NoPipelineConfiguredException('Empty pipeline configured!')

    jinja_environment = get_jinja_environment()
    jinja_environment.globals = {
        **jinja_environment.globals,
        **template_variables
    }

    helper = BaseHelper(jinja_environment)

    if 'macros' in config:
        if not isinstance(config['macros'], list):
            raise MalformedMacrosException(
                '"macros" in configuration should be a list.')
        macros = {}
        for macro in config['macros']:
            macro_template = jinja_environment.from_string(
                macro['macro'].strip())
            macro_template.name = 'macro'
            macro_template.render()
            macro_module = macro_template.module

            for f in dir(macro_module):
                if not f.startswith("_") and callable(getattr(macro_module, f)):
                    macro_func = getattr(macro_module, f)
                    macros[f] = partial(macro_helper, macro_func)

        jinja_environment.globals.update(macros)

    if 'globals' in config:
        if not isinstance(config['globals'], dict):
            raise MalformedGlobalsException(
                '"globals" in configuration should be a dictionary.')

        template_globals = helper._jinja_expand_dict_all(
            config['globals'], 'globals')
        jinja_environment.globals = {
            **jinja_environment.globals,
            **template_globals
        }

    if 'concurrency' in config:
        if not handle_concurrency_pre(logger, config['concurrency'],
                                      jinja_environment, template_variables):
            return
    if 'ignoreOn' in config:
        if not handle_ignore_on(logger, config['ignoreOn'], jinja_environment,
                                template_variables):
            return

    task_number = 1
    for task in config['pipeline']:
        if 'type' not in task or not task['type']:
            raise NoTypeInPipelineException('No type in pipeline task #%d: %s' %
                                            (task_number, str(task)))

        description = ' '
        if 'description' in task:
            description = ' "%s" ' % (task['description'])

        task_type, task_handler = task['type'].split('.', 2)
        if not task_type or not task_handler or task_type not in [
                'processor', 'output'
        ]:
            raise NoTypeInPipelineException(
                'Malformed type in pipeline task #%d: %s' %
                (task_number, str(task)))

        task_config = {}
        if 'config' in task:
            task_config = task['config']

        # Handle resend prevention mechanism
        if 'ignoreOn' in task:
            if not handle_ignore_on(logger, task['ignoreOn'], jinja_environment,
                                    template_variables):
                return

        # Handle stop processing mechanism
        if 'stopIf' in task:
            stopif_template = jinja_environment.from_string(task['stopIf'])
            stopif_template.name = 'stopif'
            stopif_contents = stopif_template.render()
            if stopif_contents.strip() != '':
                jinja_environment.globals['previous_run'] = False
                logger.info(
                    'Pipeline task%s#%d (%s) stop-if condition evaluated to true (non-empty), stopping processing.'
                    % (description, task_number, task['type']))
                helper._clean_tempdir()
                return

        # Handle conditional execution mechanism
        if 'runIf' in task:
            runif_template = jinja_environment.from_string(task['runIf'])
            runif_template.name = 'runif'
            runif_contents = runif_template.render()
            if runif_contents.strip() == '':
                jinja_environment.globals['previous_run'] = False
                logger.info(
                    'Pipeline task%s#%d (%s) run-if condition evaluated to false (empty), skipping task.'
                    % (description, task_number, task['type']))
                task_number += 1
                continue

        # Process task wide variables
        if 'variables' in task:
            for k, v in task['variables'].items():
                if isinstance(v, dict):
                    jinja_environment.globals[
                        k] = helper._jinja_expand_dict_all(v, 'variable')
                elif isinstance(v, list):
                    jinja_environment.globals[k] = helper._jinja_expand_list(
                        v, 'variable')
                elif isinstance(v, int):
                    jinja_environment.globals[k] = helper._jinja_expand_int(
                        v, 'variable')
                else:
                    jinja_environment.globals[k] = helper._jinja_expand_string(
                        v, 'variable')

        try:
            # Handle output variable expansion
            output_var = task['output'] if 'output' in task else None
            if output_var:
                if isinstance(output_var, str):
                    # Expand output variable if it's a Jinja expression
                    output_var_template = jinja_environment.from_string(
                        output_var)
                    output_var_template.name = 'output'
                    output_var = output_var_template.render()
                elif isinstance(output_var, dict):
                    new_output_var = {}
                    for k, v in output_var.items():
                        output_var_template = jinja_environment.from_string(v)
                        output_var_template.name = 'output'
                        new_output_var[k] = output_var_template.render()
                    output_var = new_output_var

            # Handle the actual work
            if task_type == 'processor':  # Handle processor
                processor = task_handler
                logger.debug(
                    'Pipeline task%s#%d (%s), running processor: %s' %
                    (description, task_number, task['type'], processor))
                mod = __import__('processors.%s' % processor)
                processor_module = getattr(mod, processor)
                processor_class = getattr(
                    processor_module, '%sProcessor' % processor.capitalize())

                processor_instance = processor_class(task_config,
                                                     jinja_environment, data,
                                                     event, context)
                if output_var:
                    processor_variables = processor_instance.process(
                        output_var=output_var)
                else:
                    processor_variables = processor_instance.process()
                template_variables.update(processor_variables)
                template_variables['previous_run'] = True
                jinja_environment.globals = {
                    **jinja_environment.globals,
                    **template_variables
                }
            elif task_type == 'output':  # Handle output
                output_type = task_handler
                logger.debug(
                    'Pipeline task%s#%d (%s), running output: %s' %
                    (description, task_number, task['type'], output_type))
                mod = __import__('output.%s' % output_type)
                output_module = getattr(mod, output_type)
                output_class = getattr(output_module,
                                       '%sOutput' % output_type.capitalize())
                output_instance = output_class(task_config, task_config,
                                               jinja_environment, data, event,
                                               context)
                output_instance.output()
                # HTTP response
                if output_instance.status_code and output_instance.body:
                    context.http_response = (output_instance.status_code,
                                             output_instance.headers,
                                             output_instance.body)
                jinja_environment.globals['previous_run'] = True
        except Exception as exc:
            jinja_environment.globals['previous_run'] = False
            if 'canFail' not in task or not task['canFail']:

                # Global output if a task fails
                if 'onError' in config:
                    error_task = config['onError']
                    if 'type' not in error_task or not error_task['type']:
                        raise NoTypeInPipelineException(
                            'No type in pipeline onError task')

                    jinja_environment.globals['exception'] = str(exc)

                    error_task_type, error_task_handler = error_task[
                        'type'].split('.', 2)

                    error_task_config = {}
                    if 'config' in error_task:
                        error_task_config = error_task['config']

                    output_type = error_task_handler
                    logger.debug(
                        'Pipeline onError task (%s), running output: %s' %
                        (error_task['type'], output_type))
                    mod = __import__('output.%s' % output_type)
                    output_module = getattr(mod, output_type)
                    output_class = getattr(
                        output_module, '%sOutput' % output_type.capitalize())
                    output_instance = output_class(error_task_config,
                                                   error_task_config,
                                                   jinja_environment, data,
                                                   event, context)
                    output_instance.output()

                logger.error(
                    'Pipeline task%s#%d (%s) failed, stopping processing.' %
                    (description, task_number, task['type']),
                    extra={'exception': traceback.format_exc()})
                if 'canFail' in config and config['canFail']:
                    logger.warn(
                        'Pipeline failed, but it is allowed to fail. Message processed.'
                    )
                    if 'concurrency' in config:
                        handle_concurrency_post(logger, config['concurrency'],
                                                jinja_environment,
                                                template_variables)

                    helper._clean_tempdir()
                    return
                else:
                    if 'concurrency' in config:
                        handle_concurrency_post(logger, config['concurrency'],
                                                jinja_environment,
                                                template_variables)
                    helper._clean_tempdir()
                    raise exc
            else:
                logger.warning(
                    'Pipeline task%s#%d (%s) failed, but continuing...' %
                    (description, task_number, task['type']),
                    extra={'exception': traceback.format_exc()})

        task_number += 1
    if 'concurrency' in config:
        handle_concurrency_post(logger, config['concurrency'],
                                jinja_environment, template_variables)
    helper._clean_tempdir()