in data_validation/validation_builder.py [0:0]
def add_filter(self, filter_field):
"""Add FilterField to Queries
Args:
filter_field (Dict): An object with source and target filter details
"""
if filter_field[consts.CONFIG_TYPE] == consts.FILTER_TYPE_CUSTOM:
source_filter = FilterField.custom(
filter_field[consts.CONFIG_FILTER_SOURCE]
)
target_filter = FilterField.custom(
filter_field[consts.CONFIG_FILTER_TARGET]
)
elif filter_field[consts.CONFIG_TYPE] == consts.FILTER_TYPE_EQUALS:
source_filter = FilterField.equal_to(
filter_field[consts.CONFIG_FILTER_SOURCE_COLUMN],
filter_field[consts.CONFIG_FILTER_SOURCE_VALUE],
)
target_filter = FilterField.equal_to(
filter_field[consts.CONFIG_FILTER_TARGET_COLUMN],
filter_field[consts.CONFIG_FILTER_TARGET_VALUE],
)
elif filter_field[consts.CONFIG_TYPE] == consts.FILTER_TYPE_ISIN:
source_filter = self._construct_isin_filter(
self.source_client,
filter_field[consts.CONFIG_FILTER_SOURCE_COLUMN],
filter_field[consts.CONFIG_FILTER_SOURCE_VALUE],
)
target_filter = self._construct_isin_filter(
self.target_client,
filter_field[consts.CONFIG_FILTER_TARGET_COLUMN],
filter_field[consts.CONFIG_FILTER_TARGET_VALUE],
)
# TODO(issues/40): Add metadata around filters
self.source_builder.add_filter_field(source_filter)
self.target_builder.add_filter_field(target_filter)