def _insert_bigquery()

in data_validation/result_handlers/bigquery.py [0:0]


    def _insert_bigquery(self, result_df):
        table = self._bigquery_client.get_table(self._table_id)
        chunk_errors = self._bigquery_client.insert_rows_from_dataframe(
            table, result_df
        )
        if any(chunk_errors):
            if (
                chunk_errors[0][0]["errors"][0]["message"]
                == "no such field: validation_status."
            ):
                raise exceptions.ResultHandlerException(
                    f"Please update your BigQuery results table schema using the script: samples/bq_utils/rename_column_schema.sh.\n"
                    f"The latest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}"
                )
            elif (
                chunk_errors[0][0]["errors"][0]["message"]
                == "no such field: primary_keys."
            ):
                raise exceptions.ResultHandlerException(
                    f"Please update your BigQuery results table schema using the script: samples/bq_utils/add_columns_schema.sh.\n"
                    f"The latest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}"
                )
            raise exceptions.ResultHandlerException(
                f"Could not write rows: {chunk_errors}"
            )

        if result_df.empty:
            logging.info(BQRH_NO_WRITE_MESSAGE)
        else:
            logging.info(
                f"{BQRH_WRITE_MESSAGE}, run id: {result_df.iloc[0][consts.CONFIG_RUN_ID]}"
            )