in data_validation/validation_builder.py [0:0]
def add_aggregate(self, aggregate_field):
"""Add Aggregate Field to Queries
Args:
aggregate_field (dict): A dictionary with source, target, and aggregation info.
"""
alias = aggregate_field[consts.CONFIG_FIELD_ALIAS]
source_field_name = aggregate_field[consts.CONFIG_SOURCE_COLUMN]
target_field_name = aggregate_field[consts.CONFIG_TARGET_COLUMN]
aggregate_type = aggregate_field.get(consts.CONFIG_TYPE)
cast = aggregate_field.get(consts.CONFIG_CAST)
if not hasattr(AggregateField, aggregate_type):
raise Exception("Unknown Aggregation Type: {}".format(aggregate_type))
source_agg = getattr(AggregateField, aggregate_type)(
field_name=source_field_name, alias=alias, cast=cast
)
target_agg = getattr(AggregateField, aggregate_type)(
field_name=target_field_name, alias=alias, cast=cast
)
self.source_builder.add_aggregate_field(source_agg)
self.target_builder.add_aggregate_field(target_agg)
self._metadata[alias] = metadata.ValidationMetadata(
validation_type=self.validation_type,
aggregation_type=aggregate_type,
source_table_schema=self.config_manager.source_schema,
source_table_name=self.config_manager.source_table,
target_table_schema=self.config_manager.target_schema,
target_table_name=self.config_manager.target_table,
source_column_name=source_field_name,
target_column_name=target_field_name,
primary_keys=self.config_manager.get_primary_keys_list(),
num_random_rows=self.config_manager.get_random_row_batch_size(),
threshold=self.config_manager.threshold,
)