in data_validation/cli_tools.py [0:0]
def get_pre_build_configs(args: "Namespace", validate_cmd: str) -> List[Dict]:
"""Return a dict of configurations to build ConfigManager object"""
def cols_from_arg(concat_arg: str, client, table_obj: dict, query_str: str) -> list:
if concat_arg == "*":
# If validating with "*" then we need to expand to count the columns.
if table_obj:
return clients.get_ibis_table_schema(
client,
table_obj["schema_name"],
table_obj["table_name"],
).names
else:
return clients.get_ibis_query_schema(
client,
query_str,
).names
else:
return get_arg_list(concat_arg)
# validate_cmd will be set to 'row`, or 'Custom-query' if invoked by generate-table-partitions depending
# on what is being partitioned. Otherwise validate_cmd will be set to None
if validate_cmd is None:
validate_cmd = args.validate_cmd.capitalize()
if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
elif validate_cmd == "Custom-query":
config_type = consts.CUSTOM_QUERY
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
# Cater for legacy -bqrh.
args.result_handler = args.result_handler or args.bq_result_handler
# Get result handler config
if args.result_handler:
result_handler_config = _get_result_handler(
args.result_handler, args.service_account
)
else:
result_handler_config = None
# Set filter_config and threshold. Not supported in case of schema validation
filter_config = getattr(args, consts.CONFIG_FILTERS, [])
threshold = getattr(args, consts.CONFIG_THRESHOLD, 0.0)
# Get labels
if args.labels is None:
labels = []
else:
labels = get_labels(args.labels)
# Get source and target clients
mgr = state_manager.StateManager()
source_client = clients.get_data_client(mgr.get_connection_config(args.source_conn))
target_client = clients.get_data_client(mgr.get_connection_config(args.target_conn))
# Get format: text, csv, json, table. Default is table
format = args.format if args.format else consts.FORMAT_TYPE_TABLE
# Get random row arguments. Only in row validations these attributes can be present.
# Bad coding here, but keeping it so as not to introduce a breaking change. See
# consts.py Line 17 for a more detailed explanation.
use_random_rows = getattr(args, "use_random_row", False)
random_row_batch_size = getattr(args, consts.CONFIG_RANDOM_ROW_BATCH_SIZE, None)
# Get table list. Not supported in case of custom query validation
is_filesystem = source_client._source_type == "FileSystem"
query_str = None
if config_type == consts.CUSTOM_QUERY:
tables_list = get_tables_list(
None, default_value=[{}], is_filesystem=is_filesystem
)
query_str = get_query_from_query_args(args.source_query, args.source_query_file)
else:
tables_list = get_tables_list(
args.tables_list, default_value=[{}], is_filesystem=is_filesystem
)
# Get validation filter status: success, fail
if args.filter_status is not None:
arg_list = get_arg_list(args.filter_status)
if all(arg in consts.VALIDATION_STATUSES for arg in arg_list):
filter_status = arg_list
else:
raise ValueError("An unsupported status was provided")
else:
filter_status = None
pre_build_configs_list = []
if config_type != consts.CUSTOM_QUERY:
tables_list = find_tables.expand_tables_of_asterisk(
tables_list, source_client, target_client
)
for table_obj in tables_list:
pre_build_configs = {
"config_type": config_type,
consts.CONFIG_SOURCE_CONN_NAME: args.source_conn,
consts.CONFIG_TARGET_CONN_NAME: args.target_conn,
"table_obj": table_obj,
consts.CONFIG_LABELS: labels,
consts.CONFIG_THRESHOLD: threshold,
consts.CONFIG_FORMAT: format,
consts.CONFIG_USE_RANDOM_ROWS: use_random_rows,
consts.CONFIG_RANDOM_ROW_BATCH_SIZE: random_row_batch_size,
"source_client": source_client,
"target_client": target_client,
"result_handler_config": result_handler_config,
"filter_config": filter_config,
consts.CONFIG_FILTER_STATUS: filter_status,
consts.CONFIG_TRIM_STRING_PKS: getattr(
args, consts.CONFIG_TRIM_STRING_PKS, False
),
consts.CONFIG_CASE_INSENSITIVE_MATCH: getattr(
args, consts.CONFIG_CASE_INSENSITIVE_MATCH, False
),
consts.CONFIG_ROW_CONCAT: getattr(args, consts.CONFIG_ROW_CONCAT, None),
consts.CONFIG_ROW_HASH: getattr(args, consts.CONFIG_ROW_HASH, None),
consts.CONFIG_RUN_ID: getattr(args, consts.CONFIG_RUN_ID, None),
"verbose": args.verbose,
}
if (
pre_build_configs[consts.CONFIG_ROW_CONCAT]
or pre_build_configs[consts.CONFIG_ROW_HASH]
):
# Ensure we don't have too many columns for the engines involved.
cols = cols_from_arg(
pre_build_configs[consts.CONFIG_ROW_HASH]
or pre_build_configs[consts.CONFIG_ROW_CONCAT],
source_client,
table_obj,
query_str,
)
new_pre_build_configs = _concat_column_count_configs(
cols,
pre_build_configs,
consts.CONFIG_ROW_HASH if args.hash else consts.CONFIG_ROW_CONCAT,
_max_concat_columns(
args.max_concat_columns, source_client, target_client
),
)
if len(new_pre_build_configs) > 1:
message_type = (
f'{table_obj["schema_name"]}.{table_obj["table_name"]}'
if table_obj
else "custom query"
)
logging.info(
f"Splitting validation into {len(new_pre_build_configs)} queries for {message_type}"
)
pre_build_configs_list.extend(new_pre_build_configs)
else:
pre_build_configs_list.append(pre_build_configs)
return pre_build_configs_list