def wait_on_last_job()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py [0:0]


def wait_on_last_job(gcs_client: storage.client, bq_client: bigquery.Client,
                     lock_blob: storage.Blob, backfill_blob: storage.blob,
                     job_id: str, table: bigquery.TableReference,
                     polling_timeout: int, retry_attempt_cnt: int) -> bool:
    """wait on a bigquery job or raise informative exception.

    Args:
        gcs_client: storage.Client
        bq_client: bigquery.Client
        lock_blob: storage.Blob _bqlock blob
        backfill_blob: storage.blob _BACKFILL blob
        job_id: str BigQuery job ID to wait on (read from _bqlock file)
        table: bigquery.TableReference table being loaded
        polling_timeout: int seconds to poll before returning.
        retry_attempt_cnt: counter for retry attempts
    """
    try:
        return utils.wait_on_bq_job_id(bq_client, job_id, table,
                                       polling_timeout)
    except (exceptions.BigQueryJobFailure, google.api_core.exceptions.NotFound,
            google.api_core.exceptions.ServerError,
            google.api_core.exceptions.BadRequest) as err:
        # Retry all internal 5xx and 400 errors up to user-defined limit
        # set in MAX_RETRIES_ON_BIGQUERY_ERROR constant
        if isinstance(err, (google.api_core.exceptions.ServerError,
                            google.api_core.exceptions.BadRequest)):
            retry_attempt_cnt += 1  # Increment the retry count
            if retry_attempt_cnt <= constants.MAX_RETRIES_ON_BIGQUERY_ERROR:
                logging.log_with_table(
                    table, f"Retrying query due to retry-able error: {err}\n"
                    f"This is {retry_attempt_cnt=}")
                retry_query(gcs_client, bq_client, lock_blob, job_id, table,
                            retry_attempt_cnt)
                return False
            # Reaching this point means all retries on 5xx errors have
            # been unsuccessful so now we'll write the error to the
            # _bqlock file, then raise an exception.
            utils.handle_bq_lock(gcs_client,
                                 lock_blob,
                                 err.message,
                                 table,
                                 retry_attempt_cnt=retry_attempt_cnt)

        raise exceptions.BigQueryJobFailure(
            f"previous BigQuery job: {job_id} failed or could not "
            "be found. This will kill the backfill subscriber for "
            f"{backfill_blob.name}."
            "Once the issue is dealt with by a human, the lock "
            "file at: "
            f"gs://{lock_blob.bucket.name}/{lock_blob.name} "
            "should be manually removed and a new empty "
            f"{constants.BACKFILL_FILENAME} "
            "file uploaded to resume the backfill subscriber so it can "
            "continue with the next item in the backlog."
            "Original Exception: "
            f"{traceback.format_exc()}") from err