in data_validation/data_validation.py [0:0]
def _add_random_row_filter(self):
"""Add random row filters to the validation builder."""
if not self.config_manager.primary_keys:
raise ValueError("Primary Keys are required for Random Row Filters")
# Filter for only first primary key (multi-pk filter not supported)
source_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_SOURCE_COLUMN
]
target_pk_column = self.config_manager.primary_keys[0][
consts.CONFIG_TARGET_COLUMN
]
randomRowBuilder = RandomRowBuilder(
[source_pk_column],
self.config_manager.random_row_batch_size(),
)
if (self.config_manager.validation_type == consts.CUSTOM_QUERY) and (
self.config_manager.custom_query_type == consts.ROW_VALIDATION.lower()
):
query = randomRowBuilder.compile_custom_query(
self.config_manager.source_client,
self.config_manager.source_query,
)
else:
query = randomRowBuilder.compile(
self.config_manager.source_client,
self.config_manager.source_schema,
self.config_manager.source_table,
self.validation_builder.source_builder,
)
# Check if source table's primary key is BINARY, if so then
# force cast the id columns to STRING (HEX).
binary_conversion_required = False
if query[source_pk_column].type().is_binary():
binary_conversion_required = True
query = query.mutate(
**{source_pk_column: query[source_pk_column].cast("string")}
)
if self.config_manager.trim_string_pks():
query = query.mutate(**{source_pk_column: query[source_pk_column].rstrip()})
random_rows = self.config_manager.source_client.execute(query)
if len(random_rows) == 0:
return
random_values = list(random_rows[source_pk_column])
if binary_conversion_required:
# For binary ids we have a list of hex strings for our IN list.
# Each of these needs to be cast back to binary.
random_values = [ibis.literal(_).cast("binary") for _ in random_values]
filter_field = {
consts.CONFIG_TYPE: consts.FILTER_TYPE_ISIN,
consts.CONFIG_FILTER_SOURCE_COLUMN: source_pk_column,
consts.CONFIG_FILTER_SOURCE_VALUE: random_values,
consts.CONFIG_FILTER_TARGET_COLUMN: target_pk_column,
consts.CONFIG_FILTER_TARGET_VALUE: random_values,
}
self.validation_builder.add_filter(filter_field)