in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]
def backlog_subscriber(gcs_client: Optional[storage.Client],
bq_client: Optional[bigquery.Client],
backfill_blob: storage.Blob, function_start_time: float):
"""Pick up the table lock, poll BQ job id until completion and process next
item in the backlog.
"""
print(f"started backfill subscriber for gs://{backfill_blob.bucket.name}/"
f"{backfill_blob.name}")
gcs_client, bq_client = _get_clients_if_none(gcs_client, bq_client)
# We need to retrigger the backfill loop before the Cloud Functions Timeout.
restart_time = function_start_time + (
float(os.getenv("FUNCTION_TIMEOUT_SEC", "60")) -
constants.RESTART_BUFFER_SECONDS)
print(f"restart time is {restart_time}")
bkt = backfill_blob.bucket
utils.handle_duplicate_notification(gcs_client, backfill_blob)
table_prefix = utils.get_table_prefix(gcs_client, backfill_blob)
last_job_done = False
# we will poll for job completion this long in an individual iteration of
# the while loop (before checking if we are too close to cloud function
# timeout and should retrigger).
polling_timeout = 5 # seconds
lock_blob: storage.Blob = bkt.blob(f"{table_prefix}/_bqlock")
if restart_time - polling_timeout < time.monotonic():
raise EnvironmentError(
"The Cloud Function timeout is too short for "
"backlog subscriber to do it's job. We recommend "
"setting the timeout to 540 seconds or at least "
"1 minute (Cloud Functions default).")
while time.monotonic() < restart_time - polling_timeout - 1:
first_bq_lock_claim = False
lock_contents_str = utils.read_gcs_file_if_exists(
gcs_client, f"gs://{bkt.name}/{lock_blob.name}")
lock_contents: Dict = json.loads(lock_contents_str or '{}')
if lock_contents:
print(
json.dumps(
dict(message=f"View lock contents in jsonPayload for"
f" gs://{bkt.name}/{lock_blob.name}",
lock_contents=lock_contents)))
job_id = lock_contents.get('job_id')
table = bigquery.TableReference.from_api_repr(
lock_contents.get('table'))
# is this a lock placed by this cloud function.
# the else will handle a manual _bqlock
if job_id and job_id.startswith(
os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)):
# To keep track of retry attempts between cloud
# function invocations, the retry count state is
# kept in the _bqlock lock file.
if lock_contents.get('retry_attempt_cnt'):
retry_attempt_cnt: int = int(
lock_contents['retry_attempt_cnt'])
last_job_done = wait_on_last_job(gcs_client, bq_client,
lock_blob, backfill_blob,
job_id, table,
polling_timeout,
retry_attempt_cnt)
else:
last_job_done = wait_on_last_job(gcs_client, bq_client,
lock_blob, backfill_blob,
job_id, table,
polling_timeout, 0)
else:
print(f"sleeping for {polling_timeout} seconds because"
f"found manual lock gs://{bkt.name}/{lock_blob.name} with"
f"manual lock contents: {lock_contents}. "
"This will be an infinite loop until the manual lock is "
"released. ")
time.sleep(polling_timeout)
continue
else: # this condition handles absence of _bqlock file
first_bq_lock_claim = True
last_job_done = True # there's no running job to poll.
if not last_job_done:
# keep polling the running job.
continue
# if reached here, last job is done.
if not first_bq_lock_claim:
# If the BQ lock was missing we do not want to delete a backlog
# item for a job we have not yet submitted.
utils.remove_oldest_backlog_item(gcs_client, bkt, table_prefix)
# Must exit subscriber if constants.WAIT_FOR_VALIDATION is set to
# True and there are other items in the _backlog because we do not
# want to process the next backlog item until validation completes
# for the batch that was just loaded. Validation process will drop
# a new _BACKFILL file as a signal to continue processing
# the remaining _backlog items.
if constants.WAIT_FOR_VALIDATION and utils.get_next_backlog_item(
gcs_client, bkt, table_prefix) is not None:
# Remove the lock blob so that the next time a _BACKFILL file
# is dropped, it can begin processing the next item in _backlog.
utils.remove_blob_quietly(gcs_client, lock_blob)
print(f"{constants.WAIT_FOR_VALIDATION=} has stopped the "
f"processing of _backlog items to allow a separate "
f"validation process to start. When validation has "
f"completed, the process must drop a new _BACKFILL file"
f"to instruct this cloud function to continue processing"
f"the _backlog items.")
return
# Submit the next item in the _backlog if it is non-empty or
# clean up the _BACKFILL and _bqlock files
should_subscriber_exit = handle_backlog(gcs_client, bq_client, bkt,
lock_blob, backfill_blob)
if should_subscriber_exit:
return
# re-trigger the subscriber loop by reposting the _BACKFILL file
print("ran out of time, restarting backfill subscriber loop for:"
f"gs://{bkt.name}/{table_prefix}")
backfill_blob = bkt.blob(f"{table_prefix}/{constants.BACKFILL_FILENAME}")
backfill_blob.upload_from_string("")