in data_validation/data_validation.py [0:0]
def execute_recursive_validation(self, validation_builder, grouped_fields):
"""Recursive execution for Row validations.
This method executes aggregate queries, such as sum-of-hashes, on the
source and target tables. Where they differ, add to the GROUP BY
clause recursively until the individual row differences can be
identified.
"""
past_results = []
if len(grouped_fields) > 0:
validation_builder.add_query_group(grouped_fields[0])
result_df = self._execute_validation(validation_builder)
for grouped_key in result_df[consts.GROUP_BY_COLUMNS].unique():
# Validations are viewed separtely, but queried together.
# We must treat them as a single item which failed or succeeded.
group_suceeded = True
grouped_key_df = result_df[
result_df[consts.GROUP_BY_COLUMNS] == grouped_key
]
if self.query_too_large(grouped_key_df, grouped_fields):
past_results.append(grouped_key_df)
continue
for row in grouped_key_df.to_dict(orient="row"):
if row[consts.SOURCE_AGG_VALUE] == row[consts.TARGET_AGG_VALUE]:
continue
else:
group_suceeded = False
break
if group_suceeded:
past_results.append(grouped_key_df)
else:
recursive_validation_builder = validation_builder.clone()
self._add_recursive_validation_filter(
recursive_validation_builder, row
)
past_results.append(
self.execute_recursive_validation(
recursive_validation_builder, grouped_fields[1:]
)
)
elif self.config_manager.primary_keys and len(grouped_fields) == 0:
past_results.append(self._execute_validation(validation_builder))
# elif self.config_manager.primary_keys:
# validation_builder.add_config_query_groups(self.config_manager.primary_keys)
# validation_builder.add_config_query_groups(grouped_fields)
else:
warnings.warn(
"WARNING: No Primary Keys Suppplied in Row Validation", UserWarning
)
return None
return pandas.concat(past_results)