in main.py [0:0]
def handle_ignore_on(logger, ignore_config, jinja_environment,
template_variables):
if 'bucket' not in ignore_config:
raise NoResendConfigException(
'No Cloud Storage bucket configured, even though ignoreOn is set!')
if 'period' not in ignore_config:
raise NoResendConfigException(
'No period configured, even though ignoreOn is set!')
resend_key_hash = hashlib.sha256()
if 'key' not in ignore_config:
default_resend_key = template_variables.copy()
default_resend_key.pop('context')
resend_key_hash.update(json.dumps(default_resend_key).encode('utf-8'))
else:
key_template = jinja_environment.from_string(ignore_config['key'])
key_template.name = 'resend'
key_contents = key_template.render()
resend_key_hash.update(key_contents.encode('utf-8'))
resend_file = resend_key_hash.hexdigest()
logger.debug('Checking for ignore object in bucket...',
extra={
'bucket': ignore_config['bucket'],
'blob': resend_file
})
if os.getenv('STORAGE_EMULATOR_HOST'):
from google.auth.credentials import AnonymousCredentials
anon_credentials = AnonymousCredentials()
storage_client = storage.Client(
client_info=get_grpc_client_info(),
client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
credentials=anon_credentials)
else:
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(ignore_config['bucket'])
resend_blob = bucket.blob(resend_file)
if resend_blob.exists():
resend_blob.reload()
resend_period = ignore_config['period']
resend_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
resend_period, sourceTime=resend_blob.time_created)
if len(resend_period_parsed) > 1:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed[0]))
else:
resend_earliest = datetime.fromtimestamp(
mktime(resend_period_parsed))
if datetime.utcnow() >= resend_earliest:
logger.info('Ignore period elapsed, reprocessing the message now.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
resend_blob.upload_from_string('')
else:
logger.info(
'Ignore period not elapsed, not reprocessing the message.',
extra={
'resend_earliest': resend_earliest,
'blob_time_created': resend_blob.time_created
})
return False
else:
try:
resend_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message processing already in progress (message ignore key already exist).',
extra={'exception': exc})
return False
else:
raise exc
return True