def _execute_validation()

in data_validation/data_validation.py [0:0]


    def _execute_validation(self, validation_builder):
        """Execute Against a Supplied Validation Builder"""

        self.run_metadata.validations = validation_builder.get_metadata()

        source_query = validation_builder.get_source_query()
        target_query = validation_builder.get_target_query()

        join_on_fields = (
            set(validation_builder.get_primary_keys())
            if (self.config_manager.validation_type == consts.ROW_VALIDATION)
            or (
                self.config_manager.validation_type == consts.CUSTOM_QUERY
                and self.config_manager.custom_query_type == "row"
            )
            else set(validation_builder.get_group_aliases())
        )

        # If row validation from YAML, compare source and target agg values
        is_value_comparison = (
            self.config_manager.validation_type == consts.ROW_VALIDATION
            or (
                self.config_manager.validation_type == consts.CUSTOM_QUERY
                and self.config_manager.custom_query_type == "row"
            )
        )

        futures = []
        with ThreadPoolExecutor() as executor:
            # Submit the two query network calls concurrently
            futures.append(
                executor.submit(
                    util.timed_call,
                    "Source query",
                    self.config_manager.source_client.execute,
                    source_query,
                )
            )
            futures.append(
                executor.submit(
                    util.timed_call,
                    "Target query",
                    self.config_manager.target_client.execute,
                    target_query,
                )
            )
            source_df = futures[0].result()
            target_df = futures[1].result()

        try:
            result_df = util.timed_call(
                "Generate report",
                combiner.generate_report,
                self.run_metadata,
                source_df,
                target_df,
                join_on_fields=join_on_fields,
                is_value_comparison=is_value_comparison,
                verbose=self.verbose,
            )
        except Exception as e:
            if self.verbose:
                logging.error("-- ** Logging Source DF ** --")
                logging.error(source_df.dtypes)
                logging.error(source_df)
                logging.error("-- ** Logging Target DF ** --")
                logging.error(target_df.dtypes)
                logging.error(target_df)
            raise e

        return result_df