data_validation/result_handlers/postgres.py (105 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Output validation report to PostgreSQL table""" import logging from typing import Iterable, TYPE_CHECKING import numpy import sqlalchemy from data_validation import clients, consts, util from data_validation.result_handlers.base_backend import ( BaseBackendResultHandler, RESULTS_TABLE_SCHEMA, RH_WRITE_MESSAGE, RH_NO_WRITE_MESSAGE, ) if TYPE_CHECKING: from pandas import DataFrame from ibis.backends.base import BaseBackend def _psql_insert_copy(table, conn, keys: list, data_iter: Iterable): """ Execute SQL statement inserting data Taken from pandas documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#insertion-method Parameters ---------- table : pandas.io.sql.SQLTable conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection keys : list[str]: List of column names data_iter : Iterable that iterates the values to be inserted """ import csv from io import StringIO # gets a DBAPI connection that can provide a cursor dbapi_conn = conn.connection with dbapi_conn.cursor() as cur: s_buf = StringIO() writer = csv.writer(s_buf) writer.writerows(data_iter) s_buf.seek(0) columns = ", ".join(['"{}"'.format(k) for k in keys]) if table.schema: table_name = "{}.{}".format(table.schema, table.name) else: table_name = table.name sql = "COPY {} ({}) FROM STDIN WITH CSV".format(table_name, columns) cur.copy_expert(sql=sql, file=s_buf) class PostgresResultHandler(BaseBackendResultHandler): """Write results of data validation to PostgreSQL.""" def __init__( self, client: "BaseBackend", status_list: list = None, table_id: str = "pso_data_validator.results", text_format: str = consts.FORMAT_TYPE_TABLE, ): self._client = client self._table_id = table_id self._status_list = status_list self._text_format = text_format @staticmethod def get_handler_for_connection( connection_config: dict, status_list=None, table_id: str = "pso_data_validator.results", text_format: str = consts.FORMAT_TYPE_TABLE, ): """Return PostgresResultHandler instance for given connection config. Args: table_id (str): Table ID used for validation results. status_list (list): provided status to filter the results with text_format (str, optional): This allows the user to influence the text results written via logger.debug. See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871 """ client = clients.get_data_client(connection_config) return PostgresResultHandler( client, status_list=status_list, table_id=table_id, text_format=text_format, ) def _set_current_schema(self, schema_name: str): """Set the current schema in a PostgreSQL backend. We need to do this due to Ibis limitations: NotImplementedError: Creating tables from a different database is not yet implemented NotImplementedError: Inserting data to a table from a different database is not yet implemented """ with self._client.begin() as con: _ = con.exec_driver_sql(f"SET schema '{schema_name}'") def _dataframe_to_sql(self, schema_name, table_name, result_df): """Inserts Dataframe data into PostgreSQL table using pandas.Dataframe.to_sql() method.""" def label_to_string(label) -> str: if isinstance(label, (list, tuple, numpy.ndarray)) and len(label) == 2: # This is the expected format return f"'{label[0]}={label[1]}'" else: # Anything else return f"'{label}'" def labels_to_array_literal(labels): """Convert Pandas labels array into a PostgreSQL array literal.""" if not labels: return "{}" return "{" + ",".join(label_to_string(_) for _ in labels) + "}" result_df[consts.CONFIG_LABELS] = result_df.labels.apply( lambda x: labels_to_array_literal(x) ) result_df.to_sql( table_name, self._client.con, schema=schema_name, if_exists="append", index=False, chunksize=1000, method=_psql_insert_copy, ) def _insert_postgres(self, result_df: "DataFrame"): """Store the validation results Dataframe to an Ibis Backend.""" if "." in self._table_id: schema_name, table_name = self._table_id.split(".") else: schema_name, table_name = None, self._table_id if schema_name: self._set_current_schema(schema_name) try: _ = clients.get_ibis_table(self._client, schema_name, table_name) # Do nothing, the table exists. except sqlalchemy.exc.NoSuchTableError: self._client.create_table(table_name, schema=RESULTS_TABLE_SCHEMA) if not result_df.empty: self._dataframe_to_sql(schema_name, table_name, result_df) if result_df.empty: logging.info(RH_NO_WRITE_MESSAGE) else: logging.info( f"{RH_WRITE_MESSAGE} to {self._table_id}, run id: {result_df.iloc[0][consts.CONFIG_RUN_ID]}" ) def execute(self, result_df: "DataFrame"): result_df = self._filter_by_status_list(result_df) util.timed_call("Write results to PostgreSQL", self._insert_postgres, result_df) self._call_text_handler(result_df) return result_df