in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]
def wait_on_last_job(gcs_client: storage.client, bq_client: bigquery.Client,
lock_blob: storage.Blob, backfill_blob: storage.blob,
job_id: str, table: bigquery.TableReference,
polling_timeout: int, retry_attempt_cnt: int) -> bool:
"""wait on a bigquery job or raise informative exception.
Args:
gcs_client: storage.Client
bq_client: bigquery.Client
lock_blob: storage.Blob _bqlock blob
backfill_blob: storage.blob _BACKFILL blob
job_id: str BigQuery job ID to wait on (read from _bqlock file)
table: bigquery.TableReference table being loaded
polling_timeout: int seconds to poll before returning.
retry_attempt_cnt: counter for retry attempts
"""
try:
return utils.wait_on_bq_job_id(bq_client, job_id, table,
polling_timeout)
except (exceptions.BigQueryJobFailure, google.api_core.exceptions.NotFound,
google.api_core.exceptions.ServerError,
google.api_core.exceptions.BadRequest) as err:
# Retry all internal 5xx and 400 errors up to user-defined limit
# set in MAX_RETRIES_ON_BIGQUERY_ERROR constant
if isinstance(err, (google.api_core.exceptions.ServerError,
google.api_core.exceptions.BadRequest)):
retry_attempt_cnt += 1 # Increment the retry count
if retry_attempt_cnt <= constants.MAX_RETRIES_ON_BIGQUERY_ERROR:
logging.log_with_table(
table, f"Retrying query due to retry-able error: {err}\n"
f"This is {retry_attempt_cnt=}")
retry_query(gcs_client, bq_client, lock_blob, job_id, table,
retry_attempt_cnt)
return False
# Reaching this point means all retries on 5xx errors have
# been unsuccessful so now we'll write the error to the
# _bqlock file, then raise an exception.
utils.handle_bq_lock(gcs_client,
lock_blob,
err.message,
table,
retry_attempt_cnt=retry_attempt_cnt)
raise exceptions.BigQueryJobFailure(
f"previous BigQuery job: {job_id} failed or could not "
"be found. This will kill the backfill subscriber for "
f"{backfill_blob.name}."
"Once the issue is dealt with by a human, the lock "
"file at: "
f"gs://{lock_blob.bucket.name}/{lock_blob.name} "
"should be manually removed and a new empty "
f"{constants.BACKFILL_FILENAME} "
"file uploaded to resume the backfill subscriber so it can "
"continue with the next item in the backlog."
"Original Exception: "
f"{traceback.format_exc()}") from err