in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def check_for_bq_job_and_children_errors(
bq_client: bigquery.Client, job: Union[bigquery.LoadJob,
bigquery.QueryJob],
table: Optional[bigquery.TableReference]):
"""checks if BigQuery job (or children jobs in case of multi-statement sql)
should be considered failed because there were errors or the query affected
no rows while FAIL_ON_ZERO_DML_ROWS_AFFECTED env var is set to True
(this is the default).
Args:
bq_client: bigquery.Client
job: Union[bigquery.LoadJob, bigquery.QueryJob] job to check for errors.
table: bigquery.TableReference of table being loaded
Raises:
exceptions.BigQueryJobFailure
"""
if job.state != "DONE":
wait_on_bq_job_id(bq_client, job.job_id, table, 5)
if job.error_result:
logging.log_bigquery_job(job, table)
# Raise any 5xx error codes
exception: Optional[
google.api_core.exceptions.GoogleAPICallError] = job.exception()
if isinstance(exception, (google.api_core.exceptions.ServerError,
google.api_core.exceptions.BadRequest)):
# Raise these two exception types so that the job can be retried
raise exception
raise exceptions.BigQueryJobFailure(
f"BigQuery Job {job.job_id} failed during backfill with the "
f"following errors: {job.error_result} "
f"{pprint.pformat(job.to_api_repr())}")
if isinstance(job, bigquery.QueryJob):
if (constants.FAIL_ON_ZERO_DML_ROWS_AFFECTED and
job.statement_type in constants.BQ_DML_STATEMENT_TYPES and
job.num_dml_affected_rows < 1):
logging.log_bigquery_job(
job,
table,
"BigQuery query job ran successfully "
"but did not affect any rows.",
"ERROR",
)
raise exceptions.BigQueryJobFailure(
f"query job {job.job_id} ran successfully but did not "
f"affect any rows. {pprint.pformat(job.to_api_repr())}")
for child_job in bq_client.list_jobs(parent_job=job):
check_for_bq_job_and_children_errors(bq_client, child_job, table)