def handle_concurrency_pre()

in main.py [0:0]


def handle_concurrency_pre(logger, concurrency_config, jinja_environment,
                           template_variables):
    concurrency_bucket, concurrency_file = get_concurrency_params(
        concurrency_config, jinja_environment, template_variables)
    logger.debug('Checking if concurrency lock file exists in bucket...',
                 extra={
                     'bucket': concurrency_bucket,
                     'blob': concurrency_file
                 })

    if os.getenv('STORAGE_EMULATOR_HOST'):
        from google.auth.credentials import AnonymousCredentials

        anon_credentials = AnonymousCredentials()
        storage_client = storage.Client(
            client_info=get_grpc_client_info(),
            client_options={"api_endpoint": os.getenv('STORAGE_EMULATOR_HOST')},
            credentials=anon_credentials)
    else:
        storage_client = storage.Client(client_info=get_grpc_client_info())

    bucket = storage_client.bucket(concurrency_bucket)
    concurrency_blob = bucket.blob(concurrency_file)
    if concurrency_blob.exists():
        if 'period' in concurrency_config:
            concurrency_blob.reload()
            concurrency_period = concurrency_config['period']
            concurrency_period_parsed = parsedatetime.Calendar(
                version=parsedatetime.VERSION_CONTEXT_STYLE).parse(
                    concurrency_period,
                    sourceTime=concurrency_blob.time_created)
            if len(concurrency_period_parsed) > 1:
                concurrency_earliest = datetime.fromtimestamp(
                    mktime(concurrency_period_parsed[0]))
            else:
                concurrency_earliest = datetime.fromtimestamp(
                    mktime(concurrency_period_parsed))

            if datetime.utcnow() >= concurrency_earliest:
                logger.info(
                    'Concurrency lock period elapsed, continuing with message processing.',
                    extra={
                        'process_earliest': concurrency_earliest,
                        'blob_time_created': concurrency_blob.time_created
                    })
                concurrency_blob.upload_from_string('')
                return True
            else:
                logger.info(
                    'Concurrency lock period not elapsed, not processing the message.',
                    extra={
                        'process_earliest': concurrency_earliest,
                        'blob_time_created': concurrency_blob.time_created
                    })
        else:
            logger.info(
                'Concurrency lock file exists, not processing the message.',
                extra={
                    'bucket': concurrency_bucket,
                    'blob': concurrency_file
                })
        if 'defer' in concurrency_config and concurrency_config['defer']:
            raise ConcurrencyRetryException(
                'Failing message processing due to concurrency control, allowing retry.'
            )
        return False
    else:
        try:
            concurrency_blob.upload_from_string('', if_generation_match=0)
        except Exception as exc:
            # Handle TOCTOU condition
            if 'conditionNotMet' in str(exc):
                logger.warning(
                    'Message processing already in progress (concurrency lock file exists).',
                    extra={'exception': exc})
                return False
            else:
                raise exc
    return True