in main.py [0:0]
def process_message_legacy(logger, config, data, event, context):
template_variables = {
'data': data,
'event': event,
'context': context,
}
jinja_environment = get_jinja_environment()
if 'processors' in config:
for processor in config['processors']:
config_key = None
output_var = None
if isinstance(processor, dict):
config_key = processor[
'config'] if 'config' in processor else None
if config_key:
# Expand config key if it's a Jinja expression
config_key_template = jinja_environment.from_string(
config_key)
config_key_template.name = 'config'
config_key = config_key_template.render()
output_var = processor[
'output'] if 'output' in processor else None
if output_var:
if isinstance(output_var, str):
# Expand output variable if it's a Jinja expression
output_var_template = jinja_environment.from_string(
output_var)
output_var_template.name = 'output'
output_var = output_var_template.render()
elif isinstance(output_var, dict):
new_output_var = {}
for k, v in output_var.items():
output_var_template = jinja_environment.from_string(
v)
output_var_template.name = 'output'
new_output_var[k] = output_var_template.render()
output_var = new_output_var
processor = processor['processor']
logger.debug('Processing message using input processor: %s' %
processor)
mod = __import__('processors.%s' % processor)
processor_module = getattr(mod, processor)
processor_class = getattr(processor_module,
'%sProcessor' % processor.capitalize())
if not config_key:
config_key = processor_class.get_default_config_key()
processor_config = {}
if config_key in config:
processor_config = config[config_key]
processor_instance = processor_class(processor_config,
jinja_environment, data, event,
context)
if output_var:
processor_variables = processor_instance.process(
output_var=output_var)
else:
processor_variables = processor_instance.process()
template_variables.update(processor_variables)
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
if 'processIf' in config:
processif_template = jinja_environment.from_string(config['processIf'])
processif_template.name = 'processif'
processif_contents = processif_template.render()
if processif_contents.strip() == '':
logger.info(
'Will not send message because processIf evaluated to empty.')
return
if 'resendBucket' in config:
if 'resendPeriod' not in config:
raise NoResendConfigException(
'No resendPeriod configured, even though resendBucket is set!')
resend_key_hash = hashlib.sha256()
if 'resendKey' not in config:
default_resend_key = template_variables.copy()
default_resend_key.pop('context')
resend_key_hash.update(
json.dumps(default_resend_key).encode('utf-8'))
else:
key_template = jinja_environment.from_string(config['resendKey'])
key_template.name = 'resend'
key_contents = key_template.render()
resend_key_hash.update(key_contents.encode('utf-8'))
resend_file = resend_key_hash.hexdigest()
logger.debug('Checking for resend object in bucket...',
extra={
'bucket': config['resendBucket'],
'blob': resend_file
})
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(config['resendBucket'])
resend_blob = bucket.blob(resend_file)
if resend_blob.exists():
resend_blob.reload()
resend_period = config['resendPeriod']
resend_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
resend_period, sourceTime=resend_blob.time_created)
if len(resend_period_parsed) > 1:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed[0]))
else:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed))
if datetime.now() >= resend_earliest:
logger.debug('Resending the message now.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
resend_blob.upload_from_string('')
else:
logger.info(
'Can\'t resend the message now, resend period not elapsed.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
return
else:
try:
resend_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message (re)sending already in progress (resend key already exist).',
extra={'exception': exc})
return
else:
raise exc
return
if 'outputs' in config:
for output_config in config['outputs']:
if 'type' not in output_config:
raise NoTypeConfiguredException(
'No type configured for output!')
if 'processIf' in output_config:
processif_template = jinja_environment.from_string(
output_config['processIf'])
processif_template.name = 'processif'
processif_contents = processif_template.render()
if processif_contents.strip() == '':
logger.info(
'Will not use output processor %s because processIf evaluated to empty.'
% output_config['type'])
continue
logger.debug('Processing message using output processor: %s' %
output_config['type'])
output_type = output_config['type']
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(output_module,
'%sOutput' % output_type.capitalize())
output_instance = output_class(config, output_config,
jinja_environment, data, event,
context)
try:
output_instance.output()
except Exception as exc:
if len(config['outputs']) > 1:
logger.error('Output processor %s failed, trying next...' %
(output_type),
extra={'exception': traceback.format_exc()})
if 'allOutputsMustSucceed' in config and config[
'allOutputsMustSucceed']:
raise exc
else:
logger.error('Output processor %s failed.' % (output_type),
extra={'exception': traceback.format_exc()})
raise exc
else:
raise NoOutputsConfiguredException('No outputs configured!')