def get_pre_build_configs()

in data_validation/cli_tools.py [0:0]


def get_pre_build_configs(args: "Namespace", validate_cmd: str) -> List[Dict]:
    """Return a dict of configurations to build ConfigManager object"""

    def cols_from_arg(concat_arg: str, client, table_obj: dict, query_str: str) -> list:
        if concat_arg == "*":
            # If validating with "*" then we need to expand to count the columns.
            if table_obj:
                return clients.get_ibis_table_schema(
                    client,
                    table_obj["schema_name"],
                    table_obj["table_name"],
                ).names
            else:
                return clients.get_ibis_query_schema(
                    client,
                    query_str,
                ).names
        else:
            return get_arg_list(concat_arg)

    # validate_cmd will be set to 'row`, or 'Custom-query' if invoked by generate-table-partitions depending
    # on what is being partitioned. Otherwise validate_cmd will be set to None
    if validate_cmd is None:
        validate_cmd = args.validate_cmd.capitalize()

    if validate_cmd == "Schema":
        config_type = consts.SCHEMA_VALIDATION
    elif validate_cmd == "Column":
        config_type = consts.COLUMN_VALIDATION
    elif validate_cmd == "Row":
        config_type = consts.ROW_VALIDATION
    elif validate_cmd == "Custom-query":
        config_type = consts.CUSTOM_QUERY
    else:
        raise ValueError(f"Unknown Validation Type: {validate_cmd}")

    # Cater for legacy -bqrh.
    args.result_handler = args.result_handler or args.bq_result_handler
    # Get result handler config
    if args.result_handler:
        result_handler_config = _get_result_handler(
            args.result_handler, args.service_account
        )
    else:
        result_handler_config = None

    # Set filter_config and threshold. Not supported in case of schema validation
    filter_config = getattr(args, consts.CONFIG_FILTERS, [])
    threshold = getattr(args, consts.CONFIG_THRESHOLD, 0.0)

    # Get labels
    if args.labels is None:
        labels = []
    else:
        labels = get_labels(args.labels)

    # Get source and target clients
    mgr = state_manager.StateManager()
    source_client = clients.get_data_client(mgr.get_connection_config(args.source_conn))
    target_client = clients.get_data_client(mgr.get_connection_config(args.target_conn))

    # Get format: text, csv, json, table. Default is table
    format = args.format if args.format else consts.FORMAT_TYPE_TABLE

    # Get random row arguments. Only in row validations these attributes can be present.
    # Bad coding here, but keeping it so as not to introduce a breaking change. See
    # consts.py Line 17 for a more detailed explanation.
    use_random_rows = getattr(args, "use_random_row", False)
    random_row_batch_size = getattr(args, consts.CONFIG_RANDOM_ROW_BATCH_SIZE, None)

    # Get table list. Not supported in case of custom query validation
    is_filesystem = source_client._source_type == "FileSystem"
    query_str = None
    if config_type == consts.CUSTOM_QUERY:
        tables_list = get_tables_list(
            None, default_value=[{}], is_filesystem=is_filesystem
        )
        query_str = get_query_from_query_args(args.source_query, args.source_query_file)
    else:
        tables_list = get_tables_list(
            args.tables_list, default_value=[{}], is_filesystem=is_filesystem
        )

    # Get validation filter status: success, fail
    if args.filter_status is not None:
        arg_list = get_arg_list(args.filter_status)
        if all(arg in consts.VALIDATION_STATUSES for arg in arg_list):
            filter_status = arg_list
        else:
            raise ValueError("An unsupported status was provided")
    else:
        filter_status = None

    pre_build_configs_list = []
    if config_type != consts.CUSTOM_QUERY:
        tables_list = find_tables.expand_tables_of_asterisk(
            tables_list, source_client, target_client
        )
    for table_obj in tables_list:
        pre_build_configs = {
            "config_type": config_type,
            consts.CONFIG_SOURCE_CONN_NAME: args.source_conn,
            consts.CONFIG_TARGET_CONN_NAME: args.target_conn,
            "table_obj": table_obj,
            consts.CONFIG_LABELS: labels,
            consts.CONFIG_THRESHOLD: threshold,
            consts.CONFIG_FORMAT: format,
            consts.CONFIG_USE_RANDOM_ROWS: use_random_rows,
            consts.CONFIG_RANDOM_ROW_BATCH_SIZE: random_row_batch_size,
            "source_client": source_client,
            "target_client": target_client,
            "result_handler_config": result_handler_config,
            "filter_config": filter_config,
            consts.CONFIG_FILTER_STATUS: filter_status,
            consts.CONFIG_TRIM_STRING_PKS: getattr(
                args, consts.CONFIG_TRIM_STRING_PKS, False
            ),
            consts.CONFIG_CASE_INSENSITIVE_MATCH: getattr(
                args, consts.CONFIG_CASE_INSENSITIVE_MATCH, False
            ),
            consts.CONFIG_ROW_CONCAT: getattr(args, consts.CONFIG_ROW_CONCAT, None),
            consts.CONFIG_ROW_HASH: getattr(args, consts.CONFIG_ROW_HASH, None),
            consts.CONFIG_RUN_ID: getattr(args, consts.CONFIG_RUN_ID, None),
            "verbose": args.verbose,
        }
        if (
            pre_build_configs[consts.CONFIG_ROW_CONCAT]
            or pre_build_configs[consts.CONFIG_ROW_HASH]
        ):
            # Ensure we don't have too many columns for the engines involved.
            cols = cols_from_arg(
                pre_build_configs[consts.CONFIG_ROW_HASH]
                or pre_build_configs[consts.CONFIG_ROW_CONCAT],
                source_client,
                table_obj,
                query_str,
            )
            new_pre_build_configs = _concat_column_count_configs(
                cols,
                pre_build_configs,
                consts.CONFIG_ROW_HASH if args.hash else consts.CONFIG_ROW_CONCAT,
                _max_concat_columns(
                    args.max_concat_columns, source_client, target_client
                ),
            )
            if len(new_pre_build_configs) > 1:
                message_type = (
                    f'{table_obj["schema_name"]}.{table_obj["table_name"]}'
                    if table_obj
                    else "custom query"
                )
                logging.info(
                    f"Splitting validation into {len(new_pre_build_configs)} queries for {message_type}"
                )
            pre_build_configs_list.extend(new_pre_build_configs)
        else:
            pre_build_configs_list.append(pre_build_configs)

    return pre_build_configs_list