in main.py [0:0]
def process_message_pipeline(logger,
config,
data,
event,
context,
using_webserver=False):
template_variables = {
'data': data,
'event': event,
'context': context,
'using_webserver': using_webserver
}
if len(config['pipeline']) == 0:
raise NoPipelineConfiguredException('Empty pipeline configured!')
jinja_environment = get_jinja_environment()
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
helper = BaseHelper(jinja_environment)
if 'macros' in config:
if not isinstance(config['macros'], list):
raise MalformedMacrosException(
'"macros" in configuration should be a list.')
macros = {}
for macro in config['macros']:
macro_template = jinja_environment.from_string(
macro['macro'].strip())
macro_template.name = 'macro'
macro_template.render()
macro_module = macro_template.module
for f in dir(macro_module):
if not f.startswith("_") and callable(getattr(macro_module, f)):
macro_func = getattr(macro_module, f)
macros[f] = partial(macro_helper, macro_func)
jinja_environment.globals.update(macros)
if 'globals' in config:
if not isinstance(config['globals'], dict):
raise MalformedGlobalsException(
'"globals" in configuration should be a dictionary.')
template_globals = helper._jinja_expand_dict_all(
config['globals'], 'globals')
jinja_environment.globals = {
**jinja_environment.globals,
**template_globals
}
if 'concurrency' in config:
if not handle_concurrency_pre(logger, config['concurrency'],
jinja_environment, template_variables):
return
if 'ignoreOn' in config:
if not handle_ignore_on(logger, config['ignoreOn'], jinja_environment,
template_variables):
return
task_number = 1
for task in config['pipeline']:
if 'type' not in task or not task['type']:
raise NoTypeInPipelineException('No type in pipeline task #%d: %s' %
(task_number, str(task)))
description = ' '
if 'description' in task:
description = ' "%s" ' % (task['description'])
task_type, task_handler = task['type'].split('.', 2)
if not task_type or not task_handler or task_type not in [
'processor', 'output'
]:
raise NoTypeInPipelineException(
'Malformed type in pipeline task #%d: %s' %
(task_number, str(task)))
task_config = {}
if 'config' in task:
task_config = task['config']
# Handle resend prevention mechanism
if 'ignoreOn' in task:
if not handle_ignore_on(logger, task['ignoreOn'], jinja_environment,
template_variables):
return
# Handle stop processing mechanism
if 'stopIf' in task:
stopif_template = jinja_environment.from_string(task['stopIf'])
stopif_template.name = 'stopif'
stopif_contents = stopif_template.render()
if stopif_contents.strip() != '':
jinja_environment.globals['previous_run'] = False
logger.info(
'Pipeline task%s#%d (%s) stop-if condition evaluated to true (non-empty), stopping processing.'
% (description, task_number, task['type']))
helper._clean_tempdir()
return
# Handle conditional execution mechanism
if 'runIf' in task:
runif_template = jinja_environment.from_string(task['runIf'])
runif_template.name = 'runif'
runif_contents = runif_template.render()
if runif_contents.strip() == '':
jinja_environment.globals['previous_run'] = False
logger.info(
'Pipeline task%s#%d (%s) run-if condition evaluated to false (empty), skipping task.'
% (description, task_number, task['type']))
task_number += 1
continue
# Process task wide variables
if 'variables' in task:
for k, v in task['variables'].items():
if isinstance(v, dict):
jinja_environment.globals[
k] = helper._jinja_expand_dict_all(v, 'variable')
elif isinstance(v, list):
jinja_environment.globals[k] = helper._jinja_expand_list(
v, 'variable')
elif isinstance(v, int):
jinja_environment.globals[k] = helper._jinja_expand_int(
v, 'variable')
else:
jinja_environment.globals[k] = helper._jinja_expand_string(
v, 'variable')
try:
# Handle output variable expansion
output_var = task['output'] if 'output' in task else None
if output_var:
if isinstance(output_var, str):
# Expand output variable if it's a Jinja expression
output_var_template = jinja_environment.from_string(
output_var)
output_var_template.name = 'output'
output_var = output_var_template.render()
elif isinstance(output_var, dict):
new_output_var = {}
for k, v in output_var.items():
output_var_template = jinja_environment.from_string(v)
output_var_template.name = 'output'
new_output_var[k] = output_var_template.render()
output_var = new_output_var
# Handle the actual work
if task_type == 'processor': # Handle processor
processor = task_handler
logger.debug(
'Pipeline task%s#%d (%s), running processor: %s' %
(description, task_number, task['type'], processor))
mod = __import__('processors.%s' % processor)
processor_module = getattr(mod, processor)
processor_class = getattr(
processor_module, '%sProcessor' % processor.capitalize())
processor_instance = processor_class(task_config,
jinja_environment, data,
event, context)
if output_var:
processor_variables = processor_instance.process(
output_var=output_var)
else:
processor_variables = processor_instance.process()
template_variables.update(processor_variables)
template_variables['previous_run'] = True
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
elif task_type == 'output': # Handle output
output_type = task_handler
logger.debug(
'Pipeline task%s#%d (%s), running output: %s' %
(description, task_number, task['type'], output_type))
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(output_module,
'%sOutput' % output_type.capitalize())
output_instance = output_class(task_config, task_config,
jinja_environment, data, event,
context)
output_instance.output()
# HTTP response
if output_instance.status_code and output_instance.body:
context.http_response = (output_instance.status_code,
output_instance.headers,
output_instance.body)
jinja_environment.globals['previous_run'] = True
except Exception as exc:
jinja_environment.globals['previous_run'] = False
if 'canFail' not in task or not task['canFail']:
# Global output if a task fails
if 'onError' in config:
error_task = config['onError']
if 'type' not in error_task or not error_task['type']:
raise NoTypeInPipelineException(
'No type in pipeline onError task')
jinja_environment.globals['exception'] = str(exc)
error_task_type, error_task_handler = error_task[
'type'].split('.', 2)
error_task_config = {}
if 'config' in error_task:
error_task_config = error_task['config']
output_type = error_task_handler
logger.debug(
'Pipeline onError task (%s), running output: %s' %
(error_task['type'], output_type))
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(
output_module, '%sOutput' % output_type.capitalize())
output_instance = output_class(error_task_config,
error_task_config,
jinja_environment, data,
event, context)
output_instance.output()
logger.error(
'Pipeline task%s#%d (%s) failed, stopping processing.' %
(description, task_number, task['type']),
extra={'exception': traceback.format_exc()})
if 'canFail' in config and config['canFail']:
logger.warn(
'Pipeline failed, but it is allowed to fail. Message processed.'
)
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment,
template_variables)
helper._clean_tempdir()
return
else:
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment,
template_variables)
helper._clean_tempdir()
raise exc
else:
logger.warning(
'Pipeline task%s#%d (%s) failed, but continuing...' %
(description, task_number, task['type']),
extra={'exception': traceback.format_exc()})
task_number += 1
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment, template_variables)
helper._clean_tempdir()