in data_validation/result_handlers/bigquery.py [0:0]
def _insert_bigquery(self, result_df):
table = self._bigquery_client.get_table(self._table_id)
chunk_errors = self._bigquery_client.insert_rows_from_dataframe(
table, result_df
)
if any(chunk_errors):
if (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: validation_status."
):
raise exceptions.ResultHandlerException(
f"Please update your BigQuery results table schema using the script: samples/bq_utils/rename_column_schema.sh.\n"
f"The latest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}"
)
elif (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: primary_keys."
):
raise exceptions.ResultHandlerException(
f"Please update your BigQuery results table schema using the script: samples/bq_utils/add_columns_schema.sh.\n"
f"The latest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}"
)
raise exceptions.ResultHandlerException(
f"Could not write rows: {chunk_errors}"
)
if result_df.empty:
logging.info(BQRH_NO_WRITE_MESSAGE)
else:
logging.info(
f"{BQRH_WRITE_MESSAGE}, run id: {result_df.iloc[0][consts.CONFIG_RUN_ID]}"
)