data_validation/combiner.py (439 lines of code) (raw):

# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module to combine two validation result sets into a single validation run. To avoid data precision loss, a BigQuery data type as closely matching the original data type is used. """ import datetime import functools import json import logging from typing import TYPE_CHECKING import ibis import ibis.expr.datatypes as dt import pandas from data_validation import consts if TYPE_CHECKING: from pandas import DataFrame import ibis.expr.types.relations.table as IbisTable from data_validation.metadata import RunMetadata, ValidationMetadata # At around 140 columns we hit RecursionError when unioning Ibis subqueries. # This constant is a threshold at which we slice up the input Dataframes # and stitch them back together again after Ibis processing. COMBINER_COLUMN_SLICE_WIDTH = 120 COMBINER_GET_SUMMARY_EXC_TEXT = ( "Error while generating summary report of row validation results" ) def generate_report( run_metadata: "RunMetadata", source_df: "DataFrame", target_df: "DataFrame", join_on_fields=(), is_value_comparison=False, verbose=False, ) -> "DataFrame": """Combine results into a report. This function is a wrapper around _generate_report_slice(). _generate_report_slice() does the main work, this wrapper simply manages the input columns and stitches the results back together. This is because validations of > 140(ish) columns trigger a RecursionError when unioning Ibis subqueries. In this method we pass in column slices of the incoming Dataframes and combine the results. It is a bit of a hack but I cannot find a way to optimize the Ibis processing. It appears to be inefficient in that we create a subquery for each validation (column) in _calculate_differences() and then union them all. We then do the same on the source/target table expressions to join it all back together again. I (nj1973) spent a singificant amount of time trying to understand/optimize the Ibis processing but fell back on this simpler (less risky) workaround. Returns: pandas.DataFrame: A pandas DataFrame with the results of the validation in the same schema as the report table. """ _check_schema_names(source_df, target_df) join_on_fields = tuple(join_on_fields) validation_columns = run_metadata.validations.keys() # slice_thresholds is a list of points at which we should break up the Dataframe by column. # e.g. [10, 20, 30] would mean process columns 0-9, 10-19 and 20-the max column. # 1. len(...) / COMBINER_COLUMN_SLICE_WIDTH: Divides total columns by the slice width to get the number of slices. # 2. int(...) + 1: int()+1 is effectively ceil() which is what we want to get the actual whole number of slices # 3. _ * COMBINER_COLUMN_SLICE_WIDTH: Multiplies each number by the slice width to get actual column counts for each slice. slice_thresholds = [ (_ * COMBINER_COLUMN_SLICE_WIDTH) for _ in range(int(len(validation_columns) / COMBINER_COLUMN_SLICE_WIDTH) + 1) ] result_df = None # Process the input Dataframes in slices of columns to avoid "RecursionError"s. for slice_start in slice_thresholds: columns_in_vertical_slice = list(validation_columns)[ slice_start : slice_start + COMBINER_COLUMN_SLICE_WIDTH ] # Ensure any join columns are included in the column slice. columns_in_vertical_slice.extend( set(join_on_fields) - set(columns_in_vertical_slice) ) interim_result_df = _generate_report_slice( run_metadata, source_df[columns_in_vertical_slice], target_df[columns_in_vertical_slice], join_on_fields=join_on_fields, is_value_comparison=is_value_comparison, verbose=verbose, ) if result_df is None: result_df = interim_result_df else: result_df = pandas.concat([result_df, interim_result_df]) # Get the first validation metadata object to fill source and/or target empty table names. first = run_metadata.validations[next(iter(run_metadata.validations))] if first.validation_type != consts.CUSTOM_QUERY: result_df.source_table_name.fillna( first.get_table_name(consts.RESULT_TYPE_SOURCE), inplace=True ) result_df.target_table_name.fillna( first.get_table_name(consts.RESULT_TYPE_TARGET), inplace=True ) _get_summary(run_metadata, result_df, source_df, target_df) return result_df def _generate_report_slice( run_metadata: "RunMetadata", source_df: "DataFrame", target_df: "DataFrame", join_on_fields=(), is_value_comparison=False, verbose=False, ) -> "DataFrame": """Combine results into a report. Args: run_metadata: Metadata about the run and validations. source_df: Dataframe contains results of source query. target_df: Dataframe contains results of target query. join_on_fields (Sequence[str]): A collection of column names to use to join source and target. These are the columns that both the source and target queries are grouped by. is_value_comparison (boolean): Boolean representing if source and target agg values should be compared with 'equals to' rather than a 'difference' comparison. Returns: pandas.DataFrame: A pandas DataFrame with the results of the validation in the same schema as the report table. """ client = ibis.pandas.connect( { consts.RESULT_TYPE_SOURCE: source_df, consts.RESULT_TYPE_TARGET: target_df, } ) source = client.table(consts.RESULT_TYPE_SOURCE) target = client.table(consts.RESULT_TYPE_TARGET) differences_pivot = _calculate_differences( source, target, join_on_fields, run_metadata.validations, is_value_comparison ) differences_df = client.execute(differences_pivot) source_pivot = _pivot_result( source, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_SOURCE ) source_pivot_df = client.execute(source_pivot) target_pivot = _pivot_result( target, join_on_fields, run_metadata.validations, consts.RESULT_TYPE_TARGET ) target_pivot_df = client.execute(target_pivot) con = ibis.pandas.connect( { consts.RESULT_TYPE_SOURCE: source_pivot_df, consts.RESULT_TYPE_DIFFERENCES: differences_df, consts.RESULT_TYPE_TARGET: target_pivot_df, } ) joined = _join_pivots( con.tables.source, con.tables.target, con.tables.differences, join_on_fields ) documented, run_metadata = _add_metadata(joined, run_metadata) if verbose: logging.debug("-- ** Combiner Query ** --") logging.debug(documented.compile()) result_df = client.execute(documented) result_df.validation_status.fillna(consts.VALIDATION_STATUS_FAIL, inplace=True) return result_df def _calculate_difference( field_differences: "IbisTable", datatype: dt.DataType, target_type: dt.DataType, validation: "ValidationMetadata", is_value_comparison: bool, ): pct_threshold = ibis.literal(validation.threshold) if datatype.is_timestamp() or datatype.is_date(): source_value = ( field_differences["differences_source_value"].epoch_seconds().cast("int64") ) target_value = ( field_differences["differences_target_value"].epoch_seconds().cast("int64") ) elif datatype.is_boolean() or (target_type and target_type.is_boolean()): source_value = field_differences["differences_source_value"].cast("boolean") target_value = field_differences["differences_target_value"].cast("boolean") elif datatype.is_decimal() or datatype.is_float64(): source_value = ( field_differences["differences_source_value"] .cast("float32") .round(digits=4) ) target_value = ( field_differences["differences_target_value"] .cast("float32") .round(digits=4) ) else: source_value = field_differences["differences_source_value"] target_value = field_differences["differences_target_value"] # Does not calculate difference between agg values for row hash due to int64 overflow if ( is_value_comparison or datatype.is_string() or isinstance(target_value, ibis.expr.types.generic.NullColumn) or isinstance(source_value, ibis.expr.types.generic.NullColumn) ): # String data types i.e "None" can be returned for NULL timestamp/datetime aggs if is_value_comparison: difference = pct_difference = ibis.null() else: difference = pct_difference = ibis.null().cast("float64") validation_status = ( ibis.case() .when( target_value.isnull() & source_value.isnull(), consts.VALIDATION_STATUS_SUCCESS, ) .when( target_value == source_value, consts.VALIDATION_STATUS_SUCCESS, ) .else_(consts.VALIDATION_STATUS_FAIL) .end() ) else: difference = (target_value - source_value).cast("float64") pct_difference_nonzero = ( ibis.literal(100.0) * difference.cast("float32") / ( source_value.case() .when(ibis.literal(0), target_value) .else_(source_value) .end() ).cast("float64") ).cast("float64") # Considers case that source and target agg values can both be 0 pct_difference = ( ibis.case() .when(difference == ibis.literal(0), ibis.literal(0).cast("float64")) .else_(pct_difference_nonzero) .end() ) th_diff = (pct_difference.abs() - pct_threshold).cast("float64") validation_status = ( ibis.case() .when( source_value.isnull() & target_value.isnull(), consts.VALIDATION_STATUS_SUCCESS, ) .when(th_diff.isnan() | (th_diff > 0.0), consts.VALIDATION_STATUS_FAIL) .else_(consts.VALIDATION_STATUS_SUCCESS) .end() ) return ( difference.name(consts.VALIDATION_DIFFERENCE), pct_difference.name(consts.VALIDATION_PCT_DIFFERENCE), pct_threshold.name(consts.VALIDATION_PCT_THRESHOLD), validation_status.name(consts.VALIDATION_STATUS), ) def _calculate_differences( source: "IbisTable", target: "IbisTable", join_on_fields: tuple, validations: "dict[ValidationMetadata]", is_value_comparison: bool, ): """Calculate differences between source and target fields. This function is separate from the "pivot" logic because we want to calculate the differences with the original data type before casting to a floating point value. The pivot casts all values to string, so the difference calculation would fail if done after that step. """ schema = source.schema() target_schema = target.schema() if join_on_fields: # Use an inner join because a row must be present in source and target # for the difference to be well defined. differences_joined = source.join(target, join_on_fields, how="inner") else: # When no join_on_fields are present, we expect only one row per table. # This is validated in generate_report before this function is called. differences_joined = source.cross_join(target) differences_pivots = [] for field, field_type in schema.items(): if field not in validations: continue target_type = target_schema.get(field, None) validation = validations[field] field_differences = differences_joined.projection( [ source[field].name("differences_source_value"), target[field].name("differences_target_value"), ] + [source[join_field] for join_field in join_on_fields] ) differences_pivots.append( field_differences[ (ibis.literal(field).name(consts.VALIDATION_NAME),) + join_on_fields + _calculate_difference( field_differences, field_type, target_type, validation, is_value_comparison, ) ] ) differences_pivot = functools.reduce( lambda pivot1, pivot2: pivot1.union(pivot2), differences_pivots ) return differences_pivot def _check_schema_names(source_df, target_df): """Check that the two input Dataframes have matching column names.""" source_names = tuple(source_df.columns) target_names = tuple(target_df.columns) if source_names != target_names: raise ValueError( "Expected source and target to have same schema, got " f"{consts.RESULT_TYPE_SOURCE}: {source_names}; {consts.RESULT_TYPE_TARGET}: {target_names}" ) def _pivot_result( result: "IbisTable", join_on_fields: tuple, validations: "dict[ValidationMetadata]", result_type: str, ): all_fields = frozenset(result.schema().names) validation_fields = ( all_fields - frozenset(join_on_fields) if "hash__all" not in join_on_fields else all_fields ) pivots = [] for field in validation_fields: if field not in validations: continue else: validation = validations[field] if validation.primary_keys: primary_keys = ( ibis.literal("{") + ibis.literal(", ").join(validation.primary_keys) + ibis.literal("}") ).name(consts.CONFIG_PRIMARY_KEYS) else: primary_keys = ( ibis.literal(None).cast("string").name(consts.CONFIG_PRIMARY_KEYS) ) pivots.append( result.projection( ( ibis.literal(field).name(consts.VALIDATION_NAME), ibis.literal(validation.validation_type).name( consts.VALIDATION_TYPE ), ibis.literal(validation.aggregation_type).name( consts.AGGREGATION_TYPE ), ibis.literal(validation.get_table_name(result_type)).name( consts.COMBINER_TABLE_NAME ), # Cast to string to ensure types match, even when column # name is NULL (such as for count aggregations). ibis.literal(validation.get_column_name(result_type)) .cast("string") .name(consts.COMBINER_COLUMN_NAME), primary_keys, ibis.literal(validation.num_random_rows).name( consts.NUM_RANDOM_ROWS ), result[field].cast("string").name(consts.COMBINER_AGG_VALUE), ) + join_on_fields ) ) pivot = functools.reduce(lambda pivot1, pivot2: pivot1.union(pivot2), pivots) return pivot def _as_json(expr): """Make field value into valid string. https://stackoverflow.com/a/3020108/101923 """ return ( expr.cast("string") .fillna("null") .re_replace(r"\\", r"\\\\") .re_replace('"', '\\"') ) def _join_pivots( source: "IbisTable", target: "IbisTable", differences: "IbisTable", join_on_fields: tuple, ): if join_on_fields: join_values = [] for field in join_on_fields: join_values.append( ibis.literal(json.dumps(field)) + ibis.literal(': "') + _as_json(target[field]) + ibis.literal('"') ) group_by_columns = ( ibis.literal("{") + ibis.literal(", ").join(join_values) + ibis.literal("}") ).name(consts.GROUP_BY_COLUMNS) else: group_by_columns = ( ibis.literal(None).cast("string").name(consts.GROUP_BY_COLUMNS) ) join_keys = (consts.VALIDATION_NAME,) + join_on_fields source_difference = source.join(differences, join_keys, how="outer")[ [source[field] for field in join_keys] + [ source[consts.VALIDATION_TYPE], source[consts.AGGREGATION_TYPE], source[consts.COMBINER_TABLE_NAME], source[consts.COMBINER_COLUMN_NAME], source[consts.CONFIG_PRIMARY_KEYS], source[consts.NUM_RANDOM_ROWS], source[consts.COMBINER_AGG_VALUE], differences[consts.VALIDATION_DIFFERENCE], differences[consts.VALIDATION_PCT_DIFFERENCE], differences[consts.VALIDATION_PCT_THRESHOLD], differences[consts.VALIDATION_STATUS], ] ] joined = source_difference.join(target, join_keys, how="outer")[ source_difference[consts.VALIDATION_NAME], source_difference[consts.VALIDATION_TYPE] .fillna(target[consts.VALIDATION_TYPE]) .name(consts.VALIDATION_TYPE), source_difference[consts.AGGREGATION_TYPE] .fillna(target[consts.AGGREGATION_TYPE]) .name(consts.AGGREGATION_TYPE), source_difference[consts.COMBINER_TABLE_NAME].name(consts.SOURCE_TABLE_NAME), source_difference[consts.COMBINER_COLUMN_NAME].name(consts.SOURCE_COLUMN_NAME), source_difference[consts.COMBINER_AGG_VALUE].name(consts.SOURCE_AGG_VALUE), target[consts.COMBINER_TABLE_NAME].name(consts.TARGET_TABLE_NAME), target[consts.COMBINER_COLUMN_NAME].name(consts.TARGET_COLUMN_NAME), target[consts.COMBINER_AGG_VALUE].name(consts.TARGET_AGG_VALUE), group_by_columns, source_difference[consts.CONFIG_PRIMARY_KEYS], source_difference[consts.NUM_RANDOM_ROWS], source_difference[consts.VALIDATION_DIFFERENCE], source_difference[consts.VALIDATION_PCT_DIFFERENCE], source_difference[consts.VALIDATION_PCT_THRESHOLD], source_difference[consts.VALIDATION_STATUS], ] return joined def _add_metadata(joined: "IbisTable", run_metadata: "RunMetadata"): # TODO: Add source and target queries to metadata run_metadata.end_time = datetime.datetime.now(datetime.timezone.utc) joined = joined[ joined, ibis.literal(run_metadata.run_id).name(consts.CONFIG_RUN_ID), ibis.literal(run_metadata.labels).name(consts.CONFIG_LABELS), ibis.literal(run_metadata.start_time).name(consts.CONFIG_START_TIME), ibis.literal(run_metadata.end_time).name(consts.CONFIG_END_TIME), ] return (joined, run_metadata) def _get_summary( run_metadata: "RunMetadata", result_df: "DataFrame", source_df: "DataFrame", target_df: "DataFrame", ): """Logs a summary report/stats of row validation results.""" try: if result_df.empty: return if (result_df.loc[0, consts.VALIDATION_TYPE] == consts.ROW_VALIDATION) or ( # Check for custom-query row validation which always should have primary keys (not null) result_df.loc[0, consts.VALIDATION_TYPE] == consts.CUSTOM_QUERY and result_df.loc[0, consts.CONFIG_PRIMARY_KEYS] ): # Vectorized calculations for all counts. success_condition = ( result_df[consts.VALIDATION_STATUS] == consts.VALIDATION_STATUS_SUCCESS ) fail_condition = ~success_condition # Invert success for fail condition. source_not_in_target = ( result_df[consts.SOURCE_AGG_VALUE].notnull() & result_df[consts.TARGET_AGG_VALUE].isnull() ) target_not_in_source = ( result_df[consts.SOURCE_AGG_VALUE].isnull() & result_df[consts.TARGET_AGG_VALUE].notnull() ) present_in_both_tables = ( result_df[consts.SOURCE_AGG_VALUE].notnull() & result_df[consts.TARGET_AGG_VALUE].notnull() ) logging.info( json.dumps( { consts.CONFIG_RUN_ID: run_metadata.run_id, consts.CONFIG_START_TIME: run_metadata.start_time.isoformat(), consts.CONFIG_END_TIME: run_metadata.end_time.isoformat(), # Explicit conversion of numpy's int64 values to int for JSON serializability consts.TOTAL_SOURCE_ROWS: int(source_df.shape[0]), consts.TOTAL_TARGET_ROWS: int(target_df.shape[0]), consts.TOTAL_ROWS_VALIDATED: int(result_df.shape[0]), # Using .sum() on boolean Series for much faster counting consts.TOTAL_ROWS_SUCCESS: int(success_condition.sum()), consts.TOTAL_ROWS_FAIL: int(fail_condition.sum()), consts.FAILED_SOURCE_NOT_IN_TARGET: int( (fail_condition & source_not_in_target).sum() ), consts.FAILED_TARGET_NOT_IN_SOURCE: int( (fail_condition & target_not_in_source).sum() ), consts.FAILED_PRESENT_IN_BOTH_TABLES: int( (fail_condition & present_in_both_tables).sum() ), } ) ) except Exception as e: logging.warning( f"{COMBINER_GET_SUMMARY_EXC_TEXT}: {e}", exc_info=True, )