def backlog_subscriber()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]


def backlog_subscriber(gcs_client: Optional[storage.Client],
                       bq_client: Optional[bigquery.Client],
                       backfill_blob: storage.Blob, function_start_time: float):
    """Pick up the table lock, poll BQ job id until completion and process next
    item in the backlog.
    """
    print(f"started backfill subscriber for gs://{backfill_blob.bucket.name}/"
          f"{backfill_blob.name}")
    gcs_client, bq_client = _get_clients_if_none(gcs_client, bq_client)
    # We need to retrigger the backfill loop before the Cloud Functions Timeout.
    restart_time = function_start_time + (
        float(os.getenv("FUNCTION_TIMEOUT_SEC", "60")) -
        constants.RESTART_BUFFER_SECONDS)
    print(f"restart time is {restart_time}")
    bkt = backfill_blob.bucket
    utils.handle_duplicate_notification(gcs_client, backfill_blob)
    table_prefix = utils.get_table_prefix(gcs_client, backfill_blob)
    last_job_done = False
    # we will poll for job completion this long in an individual iteration of
    # the while loop (before checking if we are too close to cloud function
    # timeout and should retrigger).
    polling_timeout = 5  # seconds
    lock_blob: storage.Blob = bkt.blob(f"{table_prefix}/_bqlock")
    if restart_time - polling_timeout < time.monotonic():
        raise EnvironmentError(
            "The Cloud Function timeout is too short for "
            "backlog subscriber to do it's job. We recommend "
            "setting the timeout to 540 seconds or at least "
            "1 minute (Cloud Functions default).")
    while time.monotonic() < restart_time - polling_timeout - 1:
        first_bq_lock_claim = False
        lock_contents_str = utils.read_gcs_file_if_exists(
            gcs_client, f"gs://{bkt.name}/{lock_blob.name}")
        lock_contents: Dict = json.loads(lock_contents_str or '{}')
        if lock_contents:
            print(
                json.dumps(
                    dict(message=f"View lock contents in jsonPayload for"
                         f" gs://{bkt.name}/{lock_blob.name}",
                         lock_contents=lock_contents)))
            job_id = lock_contents.get('job_id')
            table = bigquery.TableReference.from_api_repr(
                lock_contents.get('table'))
            # is this a lock placed by this cloud function.
            # the else will handle a manual _bqlock
            if job_id and job_id.startswith(
                    os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)):
                # To keep track of retry attempts between cloud
                # function invocations, the retry count state is
                # kept in the _bqlock lock file.
                if lock_contents.get('retry_attempt_cnt'):
                    retry_attempt_cnt: int = int(
                        lock_contents['retry_attempt_cnt'])
                    last_job_done = wait_on_last_job(gcs_client, bq_client,
                                                     lock_blob, backfill_blob,
                                                     job_id, table,
                                                     polling_timeout,
                                                     retry_attempt_cnt)
                else:
                    last_job_done = wait_on_last_job(gcs_client, bq_client,
                                                     lock_blob, backfill_blob,
                                                     job_id, table,
                                                     polling_timeout, 0)
            else:
                print(f"sleeping for {polling_timeout} seconds because"
                      f"found manual lock gs://{bkt.name}/{lock_blob.name} with"
                      f"manual lock contents: {lock_contents}. "
                      "This will be an infinite loop until the manual lock is "
                      "released. ")
                time.sleep(polling_timeout)
                continue
        else:  # this condition handles absence of _bqlock file
            first_bq_lock_claim = True
            last_job_done = True  # there's no running job to poll.

        if not last_job_done:
            # keep polling the running job.
            continue

        # if reached here, last job is done.
        if not first_bq_lock_claim:
            # If the BQ lock was missing we do not want to delete a backlog
            # item for a job we have not yet submitted.
            utils.remove_oldest_backlog_item(gcs_client, bkt, table_prefix)
            # Must exit subscriber if constants.WAIT_FOR_VALIDATION is set to
            # True and there are other items in the _backlog because we do not
            # want to process the next backlog item until validation completes
            # for the batch that was just loaded. Validation process will drop
            # a new _BACKFILL file as a signal to continue processing
            # the remaining _backlog items.
            if constants.WAIT_FOR_VALIDATION and utils.get_next_backlog_item(
                    gcs_client, bkt, table_prefix) is not None:
                # Remove the lock blob so that the next time a _BACKFILL file
                # is dropped, it can begin processing the next item in _backlog.
                utils.remove_blob_quietly(gcs_client, lock_blob)
                print(f"{constants.WAIT_FOR_VALIDATION=} has stopped the "
                      f"processing of _backlog items to allow a separate "
                      f"validation process to start. When validation has "
                      f"completed, the process must drop a new _BACKFILL file"
                      f"to instruct this cloud function to continue processing"
                      f"the _backlog items.")
                return

        # Submit the next item in the _backlog if it is non-empty or
        # clean up the _BACKFILL and _bqlock files
        should_subscriber_exit = handle_backlog(gcs_client, bq_client, bkt,
                                                lock_blob, backfill_blob)
        if should_subscriber_exit:
            return
    # re-trigger the subscriber loop by reposting the _BACKFILL file
    print("ran out of time, restarting backfill subscriber loop for:"
          f"gs://{bkt.name}/{table_prefix}")
    backfill_blob = bkt.blob(f"{table_prefix}/{constants.BACKFILL_FILENAME}")
    backfill_blob.upload_from_string("")