in data_validation/data_validation.py [0:0]
def _execute_validation(self, validation_builder):
"""Execute Against a Supplied Validation Builder"""
self.run_metadata.validations = validation_builder.get_metadata()
source_query = validation_builder.get_source_query()
target_query = validation_builder.get_target_query()
join_on_fields = (
set(validation_builder.get_primary_keys())
if (self.config_manager.validation_type == consts.ROW_VALIDATION)
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
)
else set(validation_builder.get_group_aliases())
)
# If row validation from YAML, compare source and target agg values
is_value_comparison = (
self.config_manager.validation_type == consts.ROW_VALIDATION
or (
self.config_manager.validation_type == consts.CUSTOM_QUERY
and self.config_manager.custom_query_type == "row"
)
)
futures = []
with ThreadPoolExecutor() as executor:
# Submit the two query network calls concurrently
futures.append(
executor.submit(
util.timed_call,
"Source query",
self.config_manager.source_client.execute,
source_query,
)
)
futures.append(
executor.submit(
util.timed_call,
"Target query",
self.config_manager.target_client.execute,
target_query,
)
)
source_df = futures[0].result()
target_df = futures[1].result()
try:
result_df = util.timed_call(
"Generate report",
combiner.generate_report,
self.run_metadata,
source_df,
target_df,
join_on_fields=join_on_fields,
is_value_comparison=is_value_comparison,
verbose=self.verbose,
)
except Exception as e:
if self.verbose:
logging.error("-- ** Logging Source DF ** --")
logging.error(source_df.dtypes)
logging.error(source_df)
logging.error("-- ** Logging Target DF ** --")
logging.error(target_df.dtypes)
logging.error(target_df)
raise e
return result_df