def add_aggregate()

in data_validation/validation_builder.py [0:0]


    def add_aggregate(self, aggregate_field):
        """Add Aggregate Field to Queries

        Args:
            aggregate_field (dict): A dictionary with source, target, and aggregation info.
        """
        alias = aggregate_field[consts.CONFIG_FIELD_ALIAS]
        source_field_name = aggregate_field[consts.CONFIG_SOURCE_COLUMN]
        target_field_name = aggregate_field[consts.CONFIG_TARGET_COLUMN]
        aggregate_type = aggregate_field.get(consts.CONFIG_TYPE)
        cast = aggregate_field.get(consts.CONFIG_CAST)
        if not hasattr(AggregateField, aggregate_type):
            raise Exception("Unknown Aggregation Type: {}".format(aggregate_type))

        source_agg = getattr(AggregateField, aggregate_type)(
            field_name=source_field_name, alias=alias, cast=cast
        )
        target_agg = getattr(AggregateField, aggregate_type)(
            field_name=target_field_name, alias=alias, cast=cast
        )

        self.source_builder.add_aggregate_field(source_agg)
        self.target_builder.add_aggregate_field(target_agg)
        self._metadata[alias] = metadata.ValidationMetadata(
            validation_type=self.validation_type,
            aggregation_type=aggregate_type,
            source_table_schema=self.config_manager.source_schema,
            source_table_name=self.config_manager.source_table,
            target_table_schema=self.config_manager.target_schema,
            target_table_name=self.config_manager.target_table,
            source_column_name=source_field_name,
            target_column_name=target_field_name,
            primary_keys=self.config_manager.get_primary_keys_list(),
            num_random_rows=self.config_manager.get_random_row_batch_size(),
            threshold=self.config_manager.threshold,
        )