in main.py [0:0]
def handle_concurrency_pre(logger, concurrency_config, jinja_environment,
template_variables):
concurrency_bucket, concurrency_file = get_concurrency_params(
concurrency_config, jinja_environment, template_variables)
logger.debug('Checking if concurrency lock file exists in bucket...',
extra={
'bucket': concurrency_bucket,
'blob': concurrency_file
})
if os.getenv('STORAGE_EMULATOR_HOST'):
from google.auth.credentials import AnonymousCredentials
anon_credentials = AnonymousCredentials()
storage_client = storage.Client(
client_info=get_grpc_client_info(),
client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
credentials=anon_credentials)
else:
storage_client = storage.Client(client_info=get_grpc_client_info())
bucket = storage_client.bucket(concurrency_bucket)
concurrency_blob = bucket.blob(concurrency_file)
if concurrency_blob.exists():
if 'period' in concurrency_config:
concurrency_blob.reload()
concurrency_period = concurrency_config['period']
concurrency_period_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
concurrency_period,
sourceTime=concurrency_blob.time_created)
if len(concurrency_period_parsed) > 1:
concurrency_earliest = datetime.fromtimestamp(
mktime(concurrency_period_parsed[0]))
else:
concurrency_earliest = datetime.fromtimestamp(
mktime(concurrency_period_parsed))
if datetime.utcnow() >= concurrency_earliest:
logger.info(
'Concurrency lock period elapsed, continuing with message processing.',
extra={
'process_earliest': concurrency_earliest,
'blob_time_created': concurrency_blob.time_created
})
concurrency_blob.upload_from_string('')
return True
else:
logger.info(
'Concurrency lock period not elapsed, not processing the message.',
extra={
'process_earliest': concurrency_earliest,
'blob_time_created': concurrency_blob.time_created
})
else:
logger.info(
'Concurrency lock file exists, not processing the message.',
extra={
'bucket': concurrency_bucket,
'blob': concurrency_file
})
if 'defer' in concurrency_config and concurrency_config['defer']:
raise ConcurrencyRetryException(
'Failing message processing due to concurrency control, allowing retry.'
)
return False
else:
try:
concurrency_blob.upload_from_string('', if_generation_match=0)
except Exception as exc:
# Handle TOCTOU condition
if 'conditionNotMet' in str(exc):
logger.warning(
'Message processing already in progress (concurrency lock file exists).',
extra={'exception': exc})
return False
else:
raise exc
return True