data_validation/result_handlers/bigquery.py (71 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Output validation report to BigQuery tables"""
import logging
from data_validation import clients, consts, exceptions, util
from data_validation.result_handlers.base_backend import BaseBackendResultHandler
BQRH_WRITE_MESSAGE = "Results written to BigQuery"
BQRH_NO_WRITE_MESSAGE = "No results to write to BigQuery"
class BigQueryResultHandler(BaseBackendResultHandler):
"""Write results of data validation to BigQuery.
Arguments:
bigquery_client (google.cloud.bigquery.client.Client):
BigQuery client for uploading results.
table_id (str):
Fully-qualified table ID (``project-id.dataset.table``) of
destination table for results.
"""
def __init__(
self,
bigquery_client,
status_list: list = None,
table_id: str = "pso_data_validator.results",
text_format: str = consts.FORMAT_TYPE_TABLE,
):
self._bigquery_client = bigquery_client
self._table_id = table_id
self._status_list = status_list
self._text_format = text_format
@staticmethod
def get_handler_for_project(
project_id,
status_list=None,
table_id: str = "pso_data_validator.results",
credentials=None,
api_endpoint: str = None,
text_format: str = consts.FORMAT_TYPE_TABLE,
):
"""Return BigQueryResultHandler instance for given project.
Args:
project_id (str): Project ID used for validation results.
table_id (str): Table ID used for validation results.
credentials (google.auth.credentials.Credentials):
Explicit credentials to use in case default credentials
aren't working properly.
status_list (list): provided status to filter the results with
api_endpoint (str): BigQuery API endpoint (e.g. https://mybq.p.googleapis.com)
text_format (str, optional):
This allows the user to influence the text results written via logger.debug.
See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871
"""
client = clients.get_google_bigquery_client(
project_id, credentials=credentials, api_endpoint=api_endpoint
)
return BigQueryResultHandler(
client,
status_list=status_list,
table_id=table_id,
text_format=text_format,
)
def _insert_bigquery(self, result_df):
table = self._bigquery_client.get_table(self._table_id)
chunk_errors = self._bigquery_client.insert_rows_from_dataframe(
table, result_df
)
if any(chunk_errors):
if (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: validation_status."
):
raise exceptions.ResultHandlerException(
f"Please update your BigQuery results table schema using the script: samples/bq_utils/rename_column_schema.sh.\n"
f"The latest release of DVT has updated the column name 'status' to 'validation_status': {chunk_errors}"
)
elif (
chunk_errors[0][0]["errors"][0]["message"]
== "no such field: primary_keys."
):
raise exceptions.ResultHandlerException(
f"Please update your BigQuery results table schema using the script: samples/bq_utils/add_columns_schema.sh.\n"
f"The latest release of DVT has added two fields 'primary_keys' and 'num_random_rows': {chunk_errors}"
)
raise exceptions.ResultHandlerException(
f"Could not write rows: {chunk_errors}"
)
if result_df.empty:
logging.info(BQRH_NO_WRITE_MESSAGE)
else:
logging.info(
f"{BQRH_WRITE_MESSAGE}, run id: {result_df.iloc[0][consts.CONFIG_RUN_ID]}"
)
def execute(self, result_df):
result_df = self._filter_by_status_list(result_df)
util.timed_call("Write results to BigQuery", self._insert_bigquery, result_df)
self._call_text_handler(result_df)
return result_df