main.py (1,003 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import base64
import yaml
import json
import argparse
import logging
import socket
import hashlib
from time import mktime
from datetime import datetime, timezone, timedelta
import parsedatetime
from dateutil import parser
from filters import get_jinja_filters, get_jinja_tests
from jinja2 import Environment, TemplateError
from google.cloud import secretmanager, storage
from pythonjsonlogger import jsonlogger
import traceback
from helpers.base import get_grpc_client_info, Context, BaseHelper
import random
import uuid
from functools import partial
from flask import Response
config_file_name = 'config.yaml'
execution_count = 0
configuration = None
logger = None
extra_vars = []
def load_configuration(file_name):
if os.getenv('CONFIG') and os.getenv('CONFIG') != '':
logger = logging.getLogger('pubsub2inbox')
secret_manager_url = os.getenv('CONFIG')
if secret_manager_url.startswith('projects/'):
logger.debug('Loading configuration from Secret Manager: %s' %
(secret_manager_url))
client = secretmanager.SecretManagerServiceClient(
client_info=get_grpc_client_info())
response = client.access_secret_version(name=secret_manager_url)
configuration = response.payload.data.decode('UTF-8')
else:
logger.debug('Loading configuration from bundled file: %s' %
(secret_manager_url))
with open(secret_manager_url) as config_file:
configuration = config_file.read()
else:
with open(file_name) as config_file:
configuration = config_file.read()
cfg = yaml.load(configuration, Loader=yaml.SafeLoader)
return cfg
def get_jinja_escaping(template_name):
if template_name and 'html' in template_name:
return True
return False
def get_jinja_environment():
env = Environment(autoescape=get_jinja_escaping,
cache_size=0,
extensions=['jinja2.ext.do'])
env.globals = {**env.globals, **{'env': os.environ}}
env.globals['req_random_int'] = random.randrange(0, 9223372036854775807)
request_uuid = uuid.uuid4()
env.globals['req_random_uuid_hex'] = request_uuid.hex
env.globals['req_random_uuid_int'] = request_uuid.int
nice_uuid = base64.urlsafe_b64encode(request_uuid.bytes)
env.globals['req_random_uuid'] = nice_uuid.decode('utf-8').rstrip(
'=').lower(
) # This is not as unique as raw UUID, but still pretty unique
if extra_vars and len(extra_vars) > 0:
for v in extra_vars:
if v[1].startswith('[') or v[1].startswith('{'):
env.globals[v[0]] = json.loads(v[1])
else:
env.globals[v[0]] = v[1]
env.filters.update(get_jinja_filters())
env.tests.update(get_jinja_tests())
return env
class ConcurrencyRetryException(Exception):
pass
class MessageTooOldException(Exception):
pass
class NoResendConfigException(Exception):
pass
class NoTypeConfiguredException(Exception):
pass
class NoOutputsConfiguredException(Exception):
pass
class NoPipelineConfiguredException(Exception):
pass
class NoTypeInPipelineException(Exception):
pass
class MalformedTypeInPipelineException(Exception):
pass
class NoDataFieldException(Exception):
pass
class NoMessageReceivedException(Exception):
pass
class InvalidMessageFormatException(Exception):
pass
class MalformedGlobalsException(Exception):
pass
class MalformedMacrosException(Exception):
pass
def check_retry_period(config, context, logger):
# Ignore messages submitted before our retry period
retry_period = '2 days ago'
if 'retryPeriod' in config:
retry_period = config['retryPeriod']
if 'maximumMessageAge' in config:
retry_period = config['maximumMessageAge']
if retry_period != 'skip':
retry_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(retry_period)
if len(retry_period_parsed) > 1:
retry_earliest = datetime.fromtimestamp(
mktime(retry_period_parsed[0]), timezone.utc)
else:
retry_earliest = datetime.fromtimestamp(mktime(retry_period_parsed),
timezone.utc)
message_time = parser.parse(context.timestamp)
if (message_time - retry_earliest) < timedelta(0, 0):
logger.warning(
'Ignoring message because it\'s past the retry period.',
extra={
'event_id': context.event_id,
'retry_period': retry_period,
'retry_earliest': retry_earliest.strftime('%c'),
'event_timestamp': message_time
})
raise MessageTooOldException(
'Ignoring message because it\'s past the retry period.')
def process_message_legacy(logger, config, data, event, context):
template_variables = {
'data': data,
'event': event,
'context': context,
}
jinja_environment = get_jinja_environment()
if 'processors' in config:
for processor in config['processors']:
config_key = None
output_var = None
if isinstance(processor, dict):
config_key = processor[
'config'] if 'config' in processor else None
if config_key:
# Expand config key if it's a Jinja expression
config_key_template = jinja_environment.from_string(
config_key)
config_key_template.name = 'config'
config_key = config_key_template.render()
output_var = processor[
'output'] if 'output' in processor else None
if output_var:
if isinstance(output_var, str):
# Expand output variable if it's a Jinja expression
output_var_template = jinja_environment.from_string(
output_var)
output_var_template.name = 'output'
output_var = output_var_template.render()
elif isinstance(output_var, dict):
new_output_var = {}
for k, v in output_var.items():
output_var_template = jinja_environment.from_string(
v)
output_var_template.name = 'output'
new_output_var[k] = output_var_template.render()
output_var = new_output_var
processor = processor['processor']
logger.debug('Processing message using input processor: %s' %
processor)
mod = __import__('processors.%s' % processor)
processor_module = getattr(mod, processor)
processor_class = getattr(processor_module,
'%sProcessor' % processor.capitalize())
if not config_key:
config_key = processor_class.get_default_config_key()
processor_config = {}
if config_key in config:
processor_config = config[config_key]
processor_instance = processor_class(processor_config,
jinja_environment, data, event,
context)
if output_var:
processor_variables = processor_instance.process(
output_var=output_var)
else:
processor_variables = processor_instance.process()
template_variables.update(processor_variables)
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
if 'processIf' in config:
processif_template = jinja_environment.from_string(config['processIf'])
processif_template.name = 'processif'
processif_contents = processif_template.render()
if processif_contents.strip() == '':
logger.info(
'Will not send message because processIf evaluated to empty.')
return
if 'resendBucket' in config:
if 'resendPeriod' not in config:
raise NoResendConfigException(
'No resendPeriod configured, even though resendBucket is set!')
resend_key_hash = hashlib.sha256()
if 'resendKey' not in config:
default_resend_key = template_variables.copy()
default_resend_key.pop('context')
resend_key_hash.update(
json.dumps(default_resend_key).encode('utf-8'))
else:
key_template = jinja_environment.from_string(config['resendKey'])
key_template.name = 'resend'
key_contents = key_template.render()
resend_key_hash.update(key_contents.encode('utf-8'))
resend_file = resend_key_hash.hexdigest()
logger.debug('Checking for resend object in bucket...',
extra={
'bucket': config['resendBucket'],
'blob': resend_file
})
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(config['resendBucket'])
resend_blob = bucket.blob(resend_file)
if resend_blob.exists():
resend_blob.reload()
resend_period = config['resendPeriod']
resend_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
resend_period, sourceTime=resend_blob.time_created)
if len(resend_period_parsed) > 1:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed[0]))
else:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed))
if datetime.now() >= resend_earliest:
logger.debug('Resending the message now.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
resend_blob.upload_from_string('')
else:
logger.info(
'Can\'t resend the message now, resend period not elapsed.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
return
else:
try:
resend_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message (re)sending already in progress (resend key already exist).',
extra={'exception': exc})
return
else:
raise exc
return
if 'outputs' in config:
for output_config in config['outputs']:
if 'type' not in output_config:
raise NoTypeConfiguredException(
'No type configured for output!')
if 'processIf' in output_config:
processif_template = jinja_environment.from_string(
output_config['processIf'])
processif_template.name = 'processif'
processif_contents = processif_template.render()
if processif_contents.strip() == '':
logger.info(
'Will not use output processor %s because processIf evaluated to empty.'
% output_config['type'])
continue
logger.debug('Processing message using output processor: %s' %
output_config['type'])
output_type = output_config['type']
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(output_module,
'%sOutput' % output_type.capitalize())
output_instance = output_class(config, output_config,
jinja_environment, data, event,
context)
try:
output_instance.output()
except Exception as exc:
if len(config['outputs']) > 1:
logger.error('Output processor %s failed, trying next...' %
(output_type),
extra={'exception': traceback.format_exc()})
if 'allOutputsMustSucceed' in config and config[
'allOutputsMustSucceed']:
raise exc
else:
logger.error('Output processor %s failed.' % (output_type),
extra={'exception': traceback.format_exc()})
raise exc
else:
raise NoOutputsConfiguredException('No outputs configured!')
def handle_ignore_on(logger, ignore_config, jinja_environment,
template_variables):
if 'bucket' not in ignore_config:
raise NoResendConfigException(
'No Cloud Storage bucket configured, even though ignoreOn is set!')
if 'period' not in ignore_config:
raise NoResendConfigException(
'No period configured, even though ignoreOn is set!')
resend_key_hash = hashlib.sha256()
if 'key' not in ignore_config:
default_resend_key = template_variables.copy()
default_resend_key.pop('context')
resend_key_hash.update(json.dumps(default_resend_key).encode('utf-8'))
else:
key_template = jinja_environment.from_string(ignore_config['key'])
key_template.name = 'resend'
key_contents = key_template.render()
resend_key_hash.update(key_contents.encode('utf-8'))
resend_file = resend_key_hash.hexdigest()
logger.debug('Checking for ignore object in bucket...',
extra={
'bucket': ignore_config['bucket'],
'blob': resend_file
})
if os.getenv('STORAGE_EMULATOR_HOST'):
from google.auth.credentials import AnonymousCredentials
anon_credentials = AnonymousCredentials()
storage_client = storage.Client(
client_info=get_grpc_client_info(),
client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
credentials=anon_credentials)
else:
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(ignore_config['bucket'])
resend_blob = bucket.blob(resend_file)
if resend_blob.exists():
resend_blob.reload()
resend_period = ignore_config['period']
resend_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
resend_period, sourceTime=resend_blob.time_created)
if len(resend_period_parsed) > 1:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed[0]))
else:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed))
if datetime.utcnow() >= resend_earliest:
logger.info('Ignore period elapsed, reprocessing the message now.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
resend_blob.upload_from_string('')
else:
logger.info(
'Ignore period not elapsed, not reprocessing the message.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
return False
else:
try:
resend_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message processing already in progress (message ignore key already exist).',
extra={'exception': exc})
return False
else:
raise exc
return True
def get_concurrency_params(concurrency_config, jinja_environment,
template_variables):
if 'bucket' not in concurrency_config:
raise NoResendConfigException(
'No Cloud Storage bucket configured, even though concurrency is set!'
)
bucket_template = jinja_environment.from_string(
concurrency_config['bucket'])
bucket_template.name = 'concurrency'
concurrency_bucket = bucket_template.render()
if 'file' in concurrency_config:
file_template = jinja_environment.from_string(
concurrency_config['file'])
file_template.name = 'concurrency'
concurrency_file = file_template.render()
else:
concurrency_file = 'pubsub2inbox.lock'
return concurrency_bucket, concurrency_file
def handle_concurrency_post(logger, concurrency_config, jinja_environment,
template_variables):
concurrency_bucket, concurrency_file = get_concurrency_params(
concurrency_config, jinja_environment, template_variables)
logger.debug('Removing concurrency lock object from bucket...',
extra={
'bucket': concurrency_bucket,
'blob': concurrency_file
})
if os.getenv('STORAGE_EMULATOR_HOST'):
from google.auth.credentials import AnonymousCredentials
anon_credentials = AnonymousCredentials()
storage_client = storage.Client(
client_info=get_grpc_client_info(),
client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
credentials=anon_credentials)
else:
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(concurrency_bucket)
concurrency_blob = bucket.blob(concurrency_file)
concurrency_blob.delete()
def handle_concurrency_pre(logger, concurrency_config, jinja_environment,
template_variables):
concurrency_bucket, concurrency_file = get_concurrency_params(
concurrency_config, jinja_environment, template_variables)
logger.debug('Checking if concurrency lock file exists in bucket...',
extra={
'bucket': concurrency_bucket,
'blob': concurrency_file
})
if os.getenv('STORAGE_EMULATOR_HOST'):
from google.auth.credentials import AnonymousCredentials
anon_credentials = AnonymousCredentials()
storage_client = storage.Client(
client_info=get_grpc_client_info(),
client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
credentials=anon_credentials)
else:
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(concurrency_bucket)
concurrency_blob = bucket.blob(concurrency_file)
if concurrency_blob.exists():
if 'period' in concurrency_config:
concurrency_blob.reload()
concurrency_period = concurrency_config['period']
concurrency_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
concurrency_period,
sourceTime=concurrency_blob.time_created)
if len(concurrency_period_parsed) > 1:
concurrency_earliest = datetime.fromtimestamp(
mktime(concurrency_period_parsed[0]))
else:
concurrency_earliest = datetime.fromtimestamp(
mktime(concurrency_period_parsed))
if datetime.utcnow() >= concurrency_earliest:
logger.info(
'Concurrency lock period elapsed, continuing with message processing.',
extra={
'process_earliest': concurrency_earliest,
'blob_time_created': concurrency_blob.time_created
})
concurrency_blob.upload_from_string('')
return True
else:
logger.info(
'Concurrency lock period not elapsed, not processing the message.',
extra={
'process_earliest': concurrency_earliest,
'blob_time_created': concurrency_blob.time_created
})
else:
logger.info(
'Concurrency lock file exists, not processing the message.',
extra={
'bucket': concurrency_bucket,
'blob': concurrency_file
})
if 'defer' in concurrency_config and concurrency_config['defer']:
raise ConcurrencyRetryException(
'Failing message processing due to concurrency control, allowing retry.'
)
return False
else:
try:
concurrency_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message processing already in progress (concurrency lock file exists).',
extra={'exception': exc})
return False
else:
raise exc
return True
def macro_helper(macro_func, *args, **kwargs):
r = macro_func(*args, **kwargs)
try:
if r.strip().startswith('[') or r.strip().startswith('{'):
e = eval(r)
return e
else:
return r
except SyntaxError: # Probably a string, huh.
return r
def process_message_pipeline(logger,
config,
data,
event,
context,
using_webserver=False):
template_variables = {
'data': data,
'event': event,
'context': context,
'using_webserver': using_webserver
}
if len(config['pipeline']) == 0:
raise NoPipelineConfiguredException('Empty pipeline configured!')
jinja_environment = get_jinja_environment()
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
helper = BaseHelper(jinja_environment)
if 'macros' in config:
if not isinstance(config['macros'], list):
raise MalformedMacrosException(
'"macros" in configuration should be a list.')
macros = {}
for macro in config['macros']:
macro_template = jinja_environment.from_string(
macro['macro'].strip())
macro_template.name = 'macro'
macro_template.render()
macro_module = macro_template.module
for f in dir(macro_module):
if not f.startswith("_") and callable(getattr(macro_module, f)):
macro_func = getattr(macro_module, f)
macros[f] = partial(macro_helper, macro_func)
jinja_environment.globals.update(macros)
if 'globals' in config:
if not isinstance(config['globals'], dict):
raise MalformedGlobalsException(
'"globals" in configuration should be a dictionary.')
template_globals = helper._jinja_expand_dict_all(
config['globals'], 'globals')
jinja_environment.globals = {
**jinja_environment.globals,
**template_globals
}
if 'concurrency' in config:
if not handle_concurrency_pre(logger, config['concurrency'],
jinja_environment, template_variables):
return
if 'ignoreOn' in config:
if not handle_ignore_on(logger, config['ignoreOn'], jinja_environment,
template_variables):
return
task_number = 1
for task in config['pipeline']:
if 'type' not in task or not task['type']:
raise NoTypeInPipelineException('No type in pipeline task #%d: %s' %
(task_number, str(task)))
description = ' '
if 'description' in task:
description = ' "%s" ' % (task['description'])
task_type, task_handler = task['type'].split('.', 2)
if not task_type or not task_handler or task_type not in [
'processor', 'output'
]:
raise NoTypeInPipelineException(
'Malformed type in pipeline task #%d: %s' %
(task_number, str(task)))
task_config = {}
if 'config' in task:
task_config = task['config']
# Handle resend prevention mechanism
if 'ignoreOn' in task:
if not handle_ignore_on(logger, task['ignoreOn'], jinja_environment,
template_variables):
return
# Handle stop processing mechanism
if 'stopIf' in task:
stopif_template = jinja_environment.from_string(task['stopIf'])
stopif_template.name = 'stopif'
stopif_contents = stopif_template.render()
if stopif_contents.strip() != '':
jinja_environment.globals['previous_run'] = False
logger.info(
'Pipeline task%s#%d (%s) stop-if condition evaluated to true (non-empty), stopping processing.'
% (description, task_number, task['type']))
helper._clean_tempdir()
return
# Handle conditional execution mechanism
if 'runIf' in task:
runif_template = jinja_environment.from_string(task['runIf'])
runif_template.name = 'runif'
runif_contents = runif_template.render()
if runif_contents.strip() == '':
jinja_environment.globals['previous_run'] = False
logger.info(
'Pipeline task%s#%d (%s) run-if condition evaluated to false (empty), skipping task.'
% (description, task_number, task['type']))
task_number += 1
continue
# Process task wide variables
if 'variables' in task:
for k, v in task['variables'].items():
if isinstance(v, dict):
jinja_environment.globals[
k] = helper._jinja_expand_dict_all(v, 'variable')
elif isinstance(v, list):
jinja_environment.globals[k] = helper._jinja_expand_list(
v, 'variable')
elif isinstance(v, int):
jinja_environment.globals[k] = helper._jinja_expand_int(
v, 'variable')
else:
jinja_environment.globals[k] = helper._jinja_expand_string(
v, 'variable')
try:
# Handle output variable expansion
output_var = task['output'] if 'output' in task else None
if output_var:
if isinstance(output_var, str):
# Expand output variable if it's a Jinja expression
output_var_template = jinja_environment.from_string(
output_var)
output_var_template.name = 'output'
output_var = output_var_template.render()
elif isinstance(output_var, dict):
new_output_var = {}
for k, v in output_var.items():
output_var_template = jinja_environment.from_string(v)
output_var_template.name = 'output'
new_output_var[k] = output_var_template.render()
output_var = new_output_var
# Handle the actual work
if task_type == 'processor': # Handle processor
processor = task_handler
logger.debug(
'Pipeline task%s#%d (%s), running processor: %s' %
(description, task_number, task['type'], processor))
mod = __import__('processors.%s' % processor)
processor_module = getattr(mod, processor)
processor_class = getattr(
processor_module, '%sProcessor' % processor.capitalize())
processor_instance = processor_class(task_config,
jinja_environment, data,
event, context)
if output_var:
processor_variables = processor_instance.process(
output_var=output_var)
else:
processor_variables = processor_instance.process()
template_variables.update(processor_variables)
template_variables['previous_run'] = True
jinja_environment.globals = {
**jinja_environment.globals,
**template_variables
}
elif task_type == 'output': # Handle output
output_type = task_handler
logger.debug(
'Pipeline task%s#%d (%s), running output: %s' %
(description, task_number, task['type'], output_type))
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(output_module,
'%sOutput' % output_type.capitalize())
output_instance = output_class(task_config, task_config,
jinja_environment, data, event,
context)
output_instance.output()
# HTTP response
if output_instance.status_code and output_instance.body:
context.http_response = (output_instance.status_code,
output_instance.headers,
output_instance.body)
jinja_environment.globals['previous_run'] = True
except Exception as exc:
jinja_environment.globals['previous_run'] = False
if 'canFail' not in task or not task['canFail']:
# Global output if a task fails
if 'onError' in config:
error_task = config['onError']
if 'type' not in error_task or not error_task['type']:
raise NoTypeInPipelineException(
'No type in pipeline onError task')
jinja_environment.globals['exception'] = str(exc)
error_task_type, error_task_handler = error_task[
'type'].split('.', 2)
error_task_config = {}
if 'config' in error_task:
error_task_config = error_task['config']
output_type = error_task_handler
logger.debug(
'Pipeline onError task (%s), running output: %s' %
(error_task['type'], output_type))
mod = __import__('output.%s' % output_type)
output_module = getattr(mod, output_type)
output_class = getattr(
output_module, '%sOutput' % output_type.capitalize())
output_instance = output_class(error_task_config,
error_task_config,
jinja_environment, data,
event, context)
output_instance.output()
logger.error(
'Pipeline task%s#%d (%s) failed, stopping processing.' %
(description, task_number, task['type']),
extra={'exception': traceback.format_exc()})
if 'canFail' in config and config['canFail']:
logger.warn(
'Pipeline failed, but it is allowed to fail. Message processed.'
)
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment,
template_variables)
helper._clean_tempdir()
return
else:
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment,
template_variables)
helper._clean_tempdir()
raise exc
else:
logger.warning(
'Pipeline task%s#%d (%s) failed, but continuing...' %
(description, task_number, task['type']),
extra={'exception': traceback.format_exc()})
task_number += 1
if 'concurrency' in config:
handle_concurrency_post(logger, config['concurrency'],
jinja_environment, template_variables)
helper._clean_tempdir()
def process_message(config, data, event, context, using_webserver=False):
logger = logging.getLogger('pubsub2inbox')
check_retry_period(config, context, logger)
if 'pipeline' in config and isinstance(config['pipeline'], list):
process_message_pipeline(logger, config, data, event, context,
using_webserver)
else:
process_message_legacy(logger, config, data, event, context)
def decode_and_process(logger, config, event, context, using_webserver=False):
if 'data' not in event:
raise NoDataFieldException('No data field in Pub/Sub message!')
logger.debug('Decoding Pub/Sub message...',
extra={
'event_id': context.event_id,
'timestamp': context.timestamp,
'hostname': socket.gethostname(),
'pid': os.getpid()
})
if event['data'] != '':
data = base64.b64decode(event['data']).decode('raw_unicode_escape')
else:
data = None
logger.debug('Starting Pub/Sub message processing...',
extra={
'event_id': context.event_id,
'data': data,
'attributes': event['attributes']
})
process_message(config, data, event, context, using_webserver)
logger.debug('Pub/Sub message processing finished.',
extra={'event_id': context.event_id})
def process_pubsub(event,
context,
message_too_old_exception=False,
using_webserver=False):
"""Function that is triggered by Pub/Sub incoming message.
Args:
event (dict): The dictionary with data specific to this type of
event. The `data` field contains the PubsubMessage message. The
`attributes` field will contain custom attributes if there are any.
context (google.cloud.functions.Context): The Cloud Functions event
metadata. The `event_id` field contains the Pub/Sub message ID. The
`timestamp` field contains the publish time.
"""
global execution_count, configuration, logger
execution_count += 1
if not logger:
logger = setup_logging()
if using_webserver:
logger.debug('Received an API call.',
extra={
'event_id': context.event_id,
'timestamp': context.timestamp,
'hostname': socket.gethostname(),
'pid': os.getpid(),
'execution_count': execution_count
})
else:
logger.debug('Received a Pub/Sub message.',
extra={
'event_id': context.event_id,
'timestamp': context.timestamp,
'hostname': socket.gethostname(),
'pid': os.getpid(),
'execution_count': execution_count
})
socket.setdefaulttimeout(10)
if not configuration:
configuration = load_configuration(config_file_name)
try:
decode_and_process(logger, configuration, event, context,
using_webserver)
except TemplateError as exc:
logger.error('Error while evaluating a Jinja2 template!',
extra={
'error_message': exc,
'error': str(exc),
})
raise exc
except MessageTooOldException as mtoe:
if not message_too_old_exception:
pass
else:
raise (mtoe)
def process_pubsub_v2(event, context, message_too_old_exception=False):
"""Function that is triggered by Pub/Sub incoming message for functions V2.
Args:
event (dict): The dictionary with data specific to this type of
event. The `data` field contains the PubsubMessage message. The
`attributes` field will contain custom attributes if there are any.
context (google.cloud.functions.Context): The Cloud Functions event
metadata. The `event_id` field contains the Pub/Sub message ID. The
`timestamp` field contains the publish time.
"""
global logger
if not logger:
logger = setup_logging()
new_context = Context(eventId=context.event_id,
timestamp=context.timestamp,
eventType=context.event_type,
resource=context.resource)
if 'attributes' not in event:
event['attributes'] = {}
if 'data' not in event:
event['data'] = ''
process_pubsub(event, new_context)
def process_api_v2(request):
"""Function that is triggered by API request for functions V2.
"""
global logger
if not logger:
logger = setup_logging()
request_headers = json.loads(json.dumps({**request.headers}))
if 'authorization' in request_headers:
del request_headers['authorization']
event = {
'data':
base64.b64encode(json.dumps(request.get_json()).encode('utf-8')),
'attributes': {
'headers': request_headers
}
}
context = Context(timestamp=datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))
process_pubsub(event,
context,
message_too_old_exception=True,
using_webserver=True)
if context.http_response:
return Response(response=context.http_response[2],
status=context.http_response[0],
headers=context.http_response[1])
return Response(response="", status=200)
class CloudRunServer:
def on_get(self, req, res):
try:
import falcon
res.content_type = falcon.MEDIA_TEXT
res.status = falcon.HTTP_400
res.text = 'Bad Request: expecting POST'
except ImportError:
logger.error(
'Falcon is required for web server mode, run: pip install falcon'
)
def on_post(self, req, res):
try:
import falcon
res.content_type = falcon.MEDIA_TEXT
# Check if this is an API call
context = None
event = None
using_webserver = False
if req.url[-4:] != '/api':
try:
envelope = req.media
except falcon.MediaNotFoundError:
raise NoMessageReceivedException(
'No Pub/Sub message received')
except falcon.MediaMalformedError:
raise InvalidMessageFormatException('Invalid Pub/Sub JSON')
if not isinstance(envelope, dict) or 'message' not in envelope:
raise InvalidMessageFormatException(
'Invalid Pub/Sub message format')
event = {
'data':
envelope['message']['data'],
'attributes':
envelope['message']['attributes']
if 'attributes' in envelope['message'] else {}
}
context = Context(eventId=envelope['message']['messageId'],
timestamp=envelope['message']['publishTime'])
else:
try:
envelope = req.media
except falcon.MediaNotFoundError:
raise NoMessageReceivedException('No request received')
except falcon.MediaMalformedError:
raise InvalidMessageFormatException('Invalid Pub/Sub JSON')
request_headers = req.headers
if 'authorization' in request_headers:
del request_headers['authorization']
event = {
'data':
base64.b64encode(json.dumps(envelope).encode('utf-8')),
'attributes': {
'headers': request_headers
}
}
context = Context(
timestamp=datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))
using_webserver = True
process_pubsub(event,
context,
message_too_old_exception=True,
using_webserver=using_webserver)
if context.http_response:
res.status = context.http_response[0]
res.set_headers(context.http_response[1])
res.text = context.http_response[2]
else:
res.status = falcon.HTTP_200
res.text = 'Message processed.'
except (NoMessageReceivedException,
InvalidMessageFormatException) as me:
# Do not attempt to retry malformed messages
logger.error('%s' % (me),
extra={'exception': traceback.format_exc()})
res.status = falcon.HTTP_204
res.text = 'Bad Request: %s' % (str(me))
except MessageTooOldException as mtoe:
res.status = falcon.HTTP_202
res.text = 'Message ignored: %s' % (mtoe)
except ImportError:
logger.error(
'Falcon is required for web server mode, run: pip install falcon'
)
except Exception as e:
traceback.print_exc()
res.status = falcon.HTTP_500
res.text = 'Internal Server Error: %s' % (e)
def setup_logging():
logger = logging.getLogger('pubsub2inbox')
if os.getenv('LOG_LEVEL') and os.getenv('LOG_LEVEL') != '':
logger.setLevel(int(os.getenv('LOG_LEVEL')))
else:
logger.setLevel(logging.INFO)
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)
logger.addHandler(json_handler)
return logger
def run_webserver(run_locally=False):
global logger
if not logger:
logger = setup_logging()
try:
import falcon
app = falcon.App()
server = CloudRunServer()
app.add_route('/', server)
app.add_route('/api', server)
if run_locally:
from waitress import serve
port = 8080 if not os.getenv('PORT') or os.getenv(
'PORT') == '' else int(os.getenv('PORT'))
serve(app, listen='*:%d' % (port))
return app
except ImportError:
logger.error(
'Falcon and waitress is required for web server mode, run: pip install falcon waitress'
)
app = None
if os.getenv('WEBSERVER') == '1':
app = run_webserver()
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(
description='Pub/Sub to Inbox, turn Pub/Sub messages into emails')
arg_parser.add_argument('--config', type=str, help='Configuration file')
arg_parser.add_argument(
'--ignore-period',
action='store_true',
help='Ignore the message timestamp (for skipping retry period)')
arg_parser.add_argument('--webserver',
action='store_true',
help='Run the function as a web server')
arg_parser.add_argument('message',
type=str,
nargs='?',
help='JSON file containing the message(s)')
arg_parser.add_argument('--set',
nargs='*',
type=lambda s: tuple(s.split('=', 2)),
help='Set a Jinja variable from command line')
args = arg_parser.parse_args()
if args.config:
config_file_name = args.config
if args.set:
extra_vars = args.set
if args.webserver or os.getenv('WEBSERVER') == '1':
run_webserver(True)
else:
if not args.message:
print(
'Specify a file containing the message to process on the command line.'
)
with open(args.message) as f:
contents = f.read()
messages = json.loads(contents)
for message in messages:
event = {
'data':
message['message']['data']
if 'data' in message['message'] else '',
'attributes':
message['message']['attributes']
if 'attributes' in message['message'] else {}
}
context = Context(eventId=message['message']['messageId'],
timestamp=message['message']['publishTime'])
if args.ignore_period:
context.timestamp = datetime.utcnow().strftime(
'%Y-%m-%dT%H:%M:%S.%fZ')
process_pubsub(event, context)