data_validation/cli_tools.py (1,307 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Data Validation CLI tool is intended to help to build and execute
data validation runs with ease.
The Data Validator can be called either using:
data-validation -h
python -m data_validation -h
ex.
Step 1) Store Connection to be used in validation
data-validation connections add -c my_bq_conn BigQuery --project-id pso-project
Step 2) Run Validation using supplied connections
data-validation validate column -sc my_bq_conn -tc my_bq_conn \
-tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \
--sum '*' --count '*'
python -m data_validation validate column -sc my_bq_conn -tc my_bq_conn \
-tbls bigquery-public-data.new_york_citibike.citibike_trips \
--grouped-columns starttime \
--sum tripduration --count tripduration
data-validation validate column \
-sc my_bq_conn -tc my_bq_conn \
-tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \
--sum tripduration,start_station_name --count tripduration,start_station_name \
-rh pso-project.pso_data_validator.results \
-c ex_yaml.yaml
command:
data-validation
"""
import argparse
import copy
import csv
import json
import logging
import sys
import uuid
import os
import math
from typing import Dict, List, Optional, TYPE_CHECKING
from yaml import Dumper, Loader, dump, load
from data_validation import (
clients,
consts,
exceptions,
find_tables,
state_manager,
gcs_helper,
util,
)
from data_validation.validation_builder import list_to_sublists
if TYPE_CHECKING:
from argparse import Namespace
CONNECTION_SOURCE_FIELDS = {
consts.SOURCE_TYPE_BIGQUERY: [
["project_id", "GCP Project to use for BigQuery"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
[
"api_endpoint",
'(Optional) GCP BigQuery API endpoint (e.g. "https://mybq.p.googleapis.com")',
],
],
consts.SOURCE_TYPE_TERADATA: [
["host", "Desired Teradata host"],
["port", "Teradata port to connect on"],
["user_name", "User used to connect"],
["password", "Password for supplied user"],
["logmech", "(Optional) Log on mechanism"],
["use_no_lock_tables", "Use an access lock for queries (defaults to False)"],
["json_params", "(Optional) Additional teradatasql JSON string parameters"],
],
consts.SOURCE_TYPE_ORACLE: [
["host", "Desired Oracle host"],
["port", "Oracle port to connect on"],
["user", "User used to connect"],
["password", "Password for supplied user"],
["database", "Database to connect to"],
["url", "Oracle SQLAlchemy connection URL"],
],
consts.SOURCE_TYPE_MSSQL: [
["host", "Desired SQL Server host (default localhost)"],
["port", "SQL Server port to connect on (default 1433)"],
["user", "User used to connect"],
["password", "Password for supplied user"],
["database", "Database to connect to (default master)"],
["query", "Connection query parameters"],
["url", "SQL Server SQLAlchemy connection URL"],
],
consts.SOURCE_TYPE_MYSQL: [
["host", "Desired MySQL host (default localhost)"],
["port", "MySQL port to connect on (default 3306)"],
["user", "User used to connect"],
["password", "Password for supplied user"],
["database", "Database to connect to (default master)"],
],
consts.SOURCE_TYPE_SNOWFLAKE: [
["user", "Username to connect to"],
["password", "Password for authentication of user"],
["account", "Snowflake account to connect to"],
["database", "Database in snowflake to connect to"],
["connect_args", "(Optional) Additional connection arg mapping"],
],
consts.SOURCE_TYPE_POSTGRES: [
["host", "Desired PostgreSQL host."],
["port", "PostgreSQL port to connect on (e.g. 5432)"],
["user", "Username to connect to"],
["password", "Password for authentication of user"],
["database", "Database in PostgreSQL to connect to (default postgres)"],
],
consts.SOURCE_TYPE_REDSHIFT: [
["host", "Desired Redshift host."],
["port", "Redshift port to connect on (e.g. 5439)"],
["user", "Username to connect to"],
["password", "Password for authentication of user"],
["database", "Database in Redshift to connect to"],
],
consts.SOURCE_TYPE_SPANNER: [
["project_id", "GCP Project to use for Spanner"],
["instance_id", "ID of Spanner instance to connect to"],
["database_id", "ID of Spanner database (schema) to connect to"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
[
"api_endpoint",
'(Optional) GCP Spanner API endpoint (e.g. "https://mycs.p.googleapis.com")',
],
],
consts.SOURCE_TYPE_FILESYSTEM: [
["table_name", "Table name to use as reference for file data"],
["file_path", "The local, s3, or GCS file path to the data"],
["file_type", "The file type of the file. 'csv', 'orc', 'parquet' or 'json'"],
],
consts.SOURCE_TYPE_IMPALA: [
["host", "Desired Impala host"],
["port", "Desired Impala port (10000 if not provided)"],
["database", "Desired Impala database (default if not provided)"],
["auth_mechanism", "Desired Impala auth mechanism (PLAIN if not provided)"],
[
"kerberos_service_name",
"Desired Kerberos service name ('impala' if not provided)",
],
["use_ssl", "Use SSL when connecting to HiveServer2 (default is False)"],
[
"timeout",
"Connection timeout in seconds when communicating with HiveServer2 (default is 45)",
],
[
"ca_cert",
"Local path to 3rd party CA certificate or copy of server certificate for self-signed certificates. If SSL is enabled, but this argument is None, then certificate validation is skipped.",
],
["user", "LDAP user to authenticate"],
["password", "LDAP password to authenticate"],
[
"pool_size",
"Size of the connection pool. Typically this is not necessary to configure. (default is 8)",
],
["hdfs_client", "An existing HDFS client"],
["use_http_transport", "Boolean if HTTP proxy is provided (default is False)"],
["http_path", "URL path of HTTP proxy"],
],
consts.SOURCE_TYPE_DB2: [
["host", "Desired DB2 host"],
["port", "Desired DB2 port (50000 if not provided)"],
["user", "Username to connect to"],
["password", "Password for authentication of user"],
["database", "Database in DB2 to connect to"],
["url", "URL link in DB2 to connect to"],
["driver", "Driver link in DB2 to connect to (default ibm_db_sa)"],
],
}
VALIDATE_HELP_TEXT = "Run a validation and optionally store to config"
VALIDATE_COLUMN_HELP_TEXT = "Run a column validation"
VALIDATE_ROW_HELP_TEXT = "Run a row validation"
VALIDATE_SCHEMA_HELP_TEXT = "Run a schema validation"
VALIDATE_CUSTOM_QUERY_HELP_TEXT = "Run a custom query validation"
def _check_custom_query_args(parser: argparse.ArgumentParser, parsed_args: "Namespace"):
# This is where we make additional checks if the arguments provided are what we expect
# For example, only one of -tbls and custom query options can be provided
if hasattr(parsed_args, "tables_list") and hasattr(
parsed_args, "source_query"
): # New Format
if (
parsed_args.tables_list
): # Tables_list is not None - so source and target queries all must be None
if (
parsed_args.source_query_file
or parsed_args.source_query
or parsed_args.target_query_file
or parsed_args.target_query
):
parser.error(
f"{parsed_args.command}: when --tables-list/-tbls is specified, --source-query-file/-sqf, --source-query/-sq, --target-query-file/-tqf and --target-query/-tq must not be specified"
)
else:
return
elif (parsed_args.source_query_file or parsed_args.source_query) and (
parsed_args.target_query_file or parsed_args.target_query
):
return
else:
parser.error(
f"{parsed_args.command}: Must specify both source (--source-query-file/-sqf or --source-query/-sq) and target (--target-query-file/-tqf or --target-query/-tq) - when --tables-list/-tbls is not specified"
)
else:
return # old format - only one of them is present
def get_parsed_args() -> "Namespace":
"""Return ArgParser with configured CLI arguments."""
parser = configure_arg_parser()
args = ["--help"] if len(sys.argv) == 1 else None
parsed_args = parser.parse_args(args)
_check_custom_query_args(parser, parsed_args)
return parsed_args
def configure_arg_parser():
"""Extract Args for Run."""
parser = argparse.ArgumentParser(
usage=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging")
parser.add_argument(
"--log-level",
"-ll",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Log Level to be assigned. This will print logs with level same or above",
)
subparsers = parser.add_subparsers(dest="command")
_configure_validate_parser(subparsers)
_configure_validation_config_parser(subparsers)
_configure_connection_parser(subparsers)
_configure_find_tables(subparsers)
_configure_raw_query(subparsers)
_configure_beta_parser(subparsers)
_configure_partition_parser(subparsers)
return parser
def _configure_partition_parser(subparsers):
"""Configure arguments to generate partitioned config files."""
partition_parser = subparsers.add_parser(
"generate-table-partitions",
help=("Generate table partitions and store validation config files"),
)
optional_arguments = partition_parser.add_argument_group("optional arguments")
required_arguments = partition_parser.add_argument_group("required arguments")
_configure_row_parser(
partition_parser,
optional_arguments,
required_arguments,
is_generate_partitions=True,
)
optional_arguments.add_argument(
"--parts-per-file",
"-ppf",
type=_check_positive,
default=1,
help="Number of partitions to be validated in a single yaml file.",
)
required_arguments.add_argument(
"--config-dir",
"-cdir",
required=True,
help="Directory path to store YAML config files. "
"GCS: Provide a full gs:// path of the target directory. "
"Eg: `gs://<BUCKET>/partitons_dir`. "
"Local: Provide a relative path of the target directory. "
"Eg: `partitions_dir`",
)
required_arguments.add_argument(
"--partition-num",
"-pn",
required=True,
help="Number of partitions into which the table should be split",
type=_check_gt_one,
)
# User can provide tables or custom queries, but not both
# However, Argparse does not support adding an argument_group to an argument_group or adding a
# mutually_exclusive_group or argument_group to a mutually_exclusive_group since version 3.11.
# We are only ensuring leaf level mutual exclusivity here and will need to check higher level
# mutual exclusivity in the code - i.e. a) when --tables-list is present, there can be no custom
# query parameters and b) when custom query parameters are specified, both source and target must be
# specified.
optional_arguments.add_argument(
"--tables-list",
"-tbls",
help=(
"Comma separated tables list in the form "
"'schema.table=target_schema.target_table'"
),
)
source_mutually_exclusive = optional_arguments.add_mutually_exclusive_group()
source_mutually_exclusive.add_argument(
"--source-query-file",
"-sqf",
help="File containing the source sql query",
)
source_mutually_exclusive.add_argument(
"--source-query",
"-sq",
help="Source sql query",
)
# Group for mutually exclusive target query arguments. Either must be supplied
target_mutually_exclusive = optional_arguments.add_mutually_exclusive_group()
target_mutually_exclusive.add_argument(
"--target-query-file",
"-tqf",
help="File containing the target sql query",
)
target_mutually_exclusive.add_argument(
"--target-query",
"-tq",
help="Target sql query",
)
def _configure_beta_parser(subparsers):
"""Configure beta commands for the parser."""
connection_parser = subparsers.add_parser(
"beta", help="Run a Beta command for new utilities and features."
)
beta_subparsers = connection_parser.add_subparsers(dest="beta_cmd")
_configure_validate_parser(beta_subparsers)
_configure_deploy(beta_subparsers)
def _configure_deploy(subparsers):
"""Configure arguments for deploying as a service."""
subparsers.add_parser(
"deploy", help="Deploy Data Validation as a Service (w/ Flask)"
)
def _configure_find_tables(subparsers):
"""Configure arguments for text search table matching."""
find_tables_parser = subparsers.add_parser(
"find-tables", help="Build tables list using approx string matching."
)
find_tables_parser.add_argument(
"--source-conn", "-sc", help="Source connection name."
)
find_tables_parser.add_argument(
"--target-conn", "-tc", help="Target connection name."
)
find_tables_parser.add_argument(
"--allowed-schemas", "-as", help="List of source schemas to match."
)
find_tables_parser.add_argument(
"--include-views",
"-iv",
default=False,
action="store_true",
help="Include views in results.",
)
find_tables_parser.add_argument(
"--score-cutoff",
"-score",
type=float,
help="The minimum distance score allowed to match tables (0 to 1).",
)
def _configure_raw_query(subparsers):
"""Configure arguments for text search table matching."""
query_parser = subparsers.add_parser(
"query", help="Run an adhoc query against the supplied connection"
)
query_parser.add_argument("--conn", "-c", help="Connection name to query")
query_parser.add_argument("--query", "-q", help="Raw query to execute")
query_parser.add_argument(
"--format",
"-f",
dest="output_format",
choices=consts.RAW_QUERY_FORMAT_TYPES,
default=consts.FORMAT_TYPE_PYTHON,
help=f"Format for query output (default: {consts.FORMAT_TYPE_PYTHON})",
)
def _configure_validation_config_parser(subparsers):
"""Configure arguments to run a data validation YAML config file."""
validation_config_parser = subparsers.add_parser(
"configs", help="Run validations stored in a YAML config file"
)
configs_subparsers = validation_config_parser.add_subparsers(
dest="validation_config_cmd"
)
list_parser = configs_subparsers.add_parser(
"list", help="List your validation configs"
)
list_parser.add_argument(
"--config-dir",
"-cdir",
help="Directory path from which to list validation YAML configs.",
)
run_parser = configs_subparsers.add_parser(
"run", help="Run your validation configs"
)
run_parser.add_argument(
"--dry-run",
"-dr",
action="store_true",
help="Prints source and target SQL to stdout in lieu of performing a validation.",
)
run_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File path to be used for building or running validations.",
)
run_parser.add_argument(
"--config-dir",
"-cdir",
help="Directory path containing YAML Config Files to be used for running validations.",
)
run_parser.add_argument(
"--kube-completions",
"-kc",
action="store_true",
help="When validating multiple table partitions generated by generate-table-partitions, using DVT in Kubernetes in index completion mode use this flag so that all the validations are completed",
)
get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
)
get_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)
def _configure_connection_parser(subparsers):
"""Configure the Parser for Connection Management."""
connection_parser = subparsers.add_parser(
"connections", help="Manage & Store connections to your Databases"
)
connect_subparsers = connection_parser.add_subparsers(dest="connect_cmd")
_ = connect_subparsers.add_parser("list", help="List your connections")
add_parser = connect_subparsers.add_parser("add", help="Store a new connection")
add_parser.add_argument(
"--connection-name", "-c", help="Name of connection used as reference"
)
add_parser.add_argument(
"--secret-manager-type",
"-sm",
default=None,
help="Secret manager type to store credentials by default will be None ",
)
add_parser.add_argument(
"--secret-manager-project-id",
"-sm-prj-id",
default=None,
help="Project ID for the secret manager that stores the credentials",
)
_configure_database_specific_parsers(add_parser)
delete_parser = connect_subparsers.add_parser(
"delete", help="Delete an existing connection"
)
delete_parser.add_argument(
"--connection-name", "-c", required=True, help="Name of connection to delete"
)
describe_parser = connect_subparsers.add_parser(
"describe", help="Describe an existing connection"
)
describe_parser.add_argument(
"--connection-name", "-c", required=True, help="Name of connection to describe"
)
describe_parser.add_argument(
"--format",
"-f",
dest="output_format",
choices=["json", "yaml"],
default="yaml",
help="Output format for the configuration (default: yaml)",
)
def _configure_database_specific_parsers(parser):
"""Configure a separate subparser for each supported DB."""
subparsers = parser.add_subparsers(dest="connect_type")
raw_parser = subparsers.add_parser(
"Raw", help="Supply Raw JSON config for a connection"
)
raw_parser.add_argument("--json", "-j", help="Json string config")
for database in CONNECTION_SOURCE_FIELDS:
article = "an" if database[0].lower() in "aeiou" else "a"
db_parser = subparsers.add_parser(
database, help=f"Store {article} {database} connection"
)
for field_obj in CONNECTION_SOURCE_FIELDS[database]:
arg_field = "--" + field_obj[0].replace("_", "-")
help_txt = field_obj[1]
db_parser.add_argument(arg_field, help=help_txt)
def _configure_validate_parser(subparsers):
"""Configure arguments to run validations."""
validate_parser = subparsers.add_parser("validate", help=VALIDATE_HELP_TEXT)
validate_parser.add_argument(
"--dry-run",
"-dr",
action="store_true",
help="Prints source and target SQL to stdout in lieu of performing a validation.",
)
validate_subparsers = validate_parser.add_subparsers(dest="validate_cmd")
column_parser = validate_subparsers.add_parser(
"column", help=VALIDATE_COLUMN_HELP_TEXT
)
_configure_column_parser(column_parser)
row_parser = validate_subparsers.add_parser("row", help=VALIDATE_ROW_HELP_TEXT)
optional_arguments = row_parser.add_argument_group("optional arguments")
required_arguments = row_parser.add_argument_group("required arguments")
_configure_row_parser(row_parser, optional_arguments, required_arguments)
schema_parser = validate_subparsers.add_parser(
"schema", help=VALIDATE_SCHEMA_HELP_TEXT
)
_configure_schema_parser(schema_parser)
custom_query_parser = validate_subparsers.add_parser(
"custom-query", help=VALIDATE_CUSTOM_QUERY_HELP_TEXT
)
_configure_custom_query_parser(custom_query_parser)
def _configure_row_parser(
parser,
optional_arguments,
required_arguments,
is_generate_partitions=False,
is_custom_query=False,
):
"""Configure arguments to run row level validations."""
# Group optional arguments
optional_arguments.add_argument(
"--primary-keys",
"-pk",
help=(
"Comma separated list of primary key columns 'col_a,col_b', "
"when not specified the value will be inferred from the source or target table if available"
),
)
optional_arguments.add_argument(
"--threshold",
"-th",
type=threshold_float,
default=0.0,
help="Float max threshold for percent difference",
)
optional_arguments.add_argument(
"--exclude-columns",
"-ec",
action="store_true",
help="Flag to indicate the list of columns should be excluded from hash or concat instead of included.",
)
optional_arguments.add_argument(
"--filters",
"-filters",
type=get_filters,
default=[],
help="Filters in the format source_filter:target_filter",
)
optional_arguments.add_argument(
"--trim-string-pks",
"-tsp",
action="store_true",
help=(
"Trims string based primary key values, intended for use when one engine uses "
"padded string semantics (e.g. CHAR(n)) and the other does not (e.g. VARCHAR(n))."
),
)
optional_arguments.add_argument(
"--case-insensitive-match",
"-cim",
action="store_true",
help=(
"Performs a case insensitive match by adding an UPPER() before comparison."
),
)
optional_arguments.add_argument(
"--max-concat-columns",
"-mcc",
type=int,
help=(
"The maximum number of columns accepted by a --hash or --concat validation. When there are "
"more columns than this the validation will implicitly be split into multiple validations. "
"This option has engine specific defaults."
),
)
# Generate-table-partitions and custom-query does not support random row
if not (is_generate_partitions or is_custom_query):
optional_arguments.add_argument(
"--use-random-row",
"-rr",
action="store_true",
help="Finds a set of random rows of the first primary key supplied.",
)
optional_arguments.add_argument(
"--random-row-batch-size",
"-rbs",
help="Row batch size used for random row filters (default 10,000).",
)
# Generate table partitions follows a new argument spec where either the table names or queries can be provided, but not both.
# that is specified in configure_partition_parser. If we use the same spec for row and column validation, the custom query commands
# may get subsumed by validate and validate commands by specifying tables name or queries. Until this -tbls will be
# a required argument for validate row, validate column and validate schema.
required_arguments.add_argument(
"--tables-list",
"-tbls",
default=None,
required=True,
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'",
)
# Group for mutually exclusive required arguments. Either must be supplied
mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group(
required=True
)
mutually_exclusive_arguments.add_argument(
"--hash",
"-hash",
help=(
"Comma separated list of columns for hash 'col_a,col_b' or * for "
"all columns"
),
)
mutually_exclusive_arguments.add_argument(
"--concat",
"-concat",
help=(
"Comma separated list of columns for concat 'col_a,col_b' or * "
"for all columns"
),
)
mutually_exclusive_arguments.add_argument(
"--comparison-fields",
"-comp-fields",
help=(
"Individual columns to compare. If comparing a calculated field use "
"the column alias."
),
)
_add_common_arguments(
optional_arguments,
required_arguments,
is_generate_partitions=is_generate_partitions,
)
def _configure_column_parser(column_parser):
"""Configure arguments to run column level validations."""
# Group optional arguments
optional_arguments = column_parser.add_argument_group("optional arguments")
optional_arguments.add_argument(
"--count",
"-count",
help="Comma separated list of columns for count 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--sum",
"-sum",
help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--avg",
"-avg",
help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--min",
"-min",
help="Comma separated list of columns for min 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--max",
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--bit_xor",
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--std",
"-std",
help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--grouped-columns",
"-gc",
help="Comma separated list of columns to use in GroupBy 'col_a,col_b'",
)
optional_arguments.add_argument(
"--exclude-columns",
"-ec",
action="store_true",
help="Flag to indicate the list of columns should be excluded from validation and not included.",
)
optional_arguments.add_argument(
"--threshold",
"-th",
type=threshold_float,
default=0.0,
help="Float max threshold for percent difference",
)
optional_arguments.add_argument(
"--filters",
"-filters",
type=get_filters,
default=[],
help="Filters in the format source_filter:target_filter",
)
optional_arguments.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)
optional_arguments.add_argument(
"--wildcard-include-timestamp",
"-wit",
action="store_true",
help="Include timestamp/date fields for wildcard aggregations.",
)
optional_arguments.add_argument(
"--cast-to-bigint",
"-ctb",
action="store_true",
help="Cast any int32 fields to int64 for large aggregations.",
)
# Group required arguments
required_arguments = column_parser.add_argument_group("required arguments")
required_arguments.add_argument(
"--tables-list",
"-tbls",
default=None,
required=True,
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'. Or shorthand schema.* for all tables.",
)
_add_common_arguments(optional_arguments, required_arguments)
def _configure_schema_parser(schema_parser):
"""Configure arguments to run schema level validations."""
# Group optional arguments
optional_arguments = schema_parser.add_argument_group("optional arguments")
optional_arguments.add_argument(
"--exclusion-columns",
"-ec",
help="Comma separated list of columns 'col_a,col_b' to be excluded from the schema validation",
)
optional_arguments.add_argument(
"--allow-list",
"-al",
help="Comma separated list of datatype mappings due to incompatible datatypes in source and target. e.g.: decimal(12,2):decimal(38,9),!string:string,decimal(10-18,0):int64",
)
optional_arguments.add_argument(
"--allow-list-file",
"-alf",
help="YAML file containing default --allow-list mappings. Can be used in conjunction with --allow-list. e.g.: samples/allow_list/oracle_to_bigquery.yaml or gs://dvt-allow-list-files/oracle_to_bigquery.yaml. See example files in samples/allow_list/",
)
# Group required arguments
required_arguments = schema_parser.add_argument_group("required arguments")
required_arguments.add_argument(
"--tables-list",
"-tbls",
default=None,
required=True,
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'",
)
_add_common_arguments(optional_arguments, required_arguments)
def _configure_custom_query_parser(custom_query_parser):
"""Configure arguments to run custom-query validations."""
custom_query_subparsers = custom_query_parser.add_subparsers(
dest="custom_query_type"
)
# Add arguments for custom-query row parser
custom_query_row_parser = custom_query_subparsers.add_parser(
"row", help="Run a custom query row validation"
)
_configure_custom_query_row_parser(custom_query_row_parser)
# Add arguments for custom-query column parser
custom_query_column_parser = custom_query_subparsers.add_parser(
"column", help="Run a custom query column validation"
)
_configure_custom_query_column_parser(custom_query_column_parser)
def _configure_custom_query_row_parser(custom_query_row_parser):
optional_arguments = custom_query_row_parser.add_argument_group(
"optional arguments"
)
required_arguments = custom_query_row_parser.add_argument_group(
"required arguments"
)
_configure_row_parser(
custom_query_row_parser,
optional_arguments,
required_arguments,
is_custom_query=True,
)
# Group for mutually exclusive source query arguments. Either must be supplied
source_mutually_exclusive = required_arguments.add_mutually_exclusive_group(
required=True
)
source_mutually_exclusive.add_argument(
"--source-query-file",
"-sqf",
help="File containing the source sql query",
)
source_mutually_exclusive.add_argument(
"--source-query",
"-sq",
help="Source sql query",
)
# Group for mutually exclusive target query arguments. Either must be supplied
target_mutually_exclusive = required_arguments.add_mutually_exclusive_group(
required=True
)
target_mutually_exclusive.add_argument(
"--target-query-file",
"-tqf",
help="File containing the target sql query",
)
target_mutually_exclusive.add_argument(
"--target-query",
"-tq",
help="Target sql query",
)
def _configure_custom_query_column_parser(custom_query_column_parser):
# Group optional arguments
optional_arguments = custom_query_column_parser.add_argument_group(
"optional arguments"
)
optional_arguments.add_argument(
"--count",
"-count",
help="Comma separated list of columns for count 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--sum",
"-sum",
help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--avg",
"-avg",
help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--min",
"-min",
help="Comma separated list of columns for min 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--max",
"-max",
help="Comma separated list of columns for max 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--bit_xor",
"-bit_xor",
help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--std",
"-std",
help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns",
)
optional_arguments.add_argument(
"--exclude-columns",
"-ec",
action="store_true",
help="Flag to indicate the list of columns should be excluded from validation and not included.",
)
optional_arguments.add_argument(
"--wildcard-include-string-len",
"-wis",
action="store_true",
help="Include string fields for wildcard aggregations.",
)
optional_arguments.add_argument(
"--wildcard-include-timestamp",
"-wit",
action="store_true",
help="Include timestamp/date fields for wildcard aggregations.",
)
optional_arguments.add_argument(
"--cast-to-bigint",
"-ctb",
action="store_true",
help="Cast any int32 fields to int64 for large aggregations.",
)
optional_arguments.add_argument(
"--filters",
"-filters",
type=get_filters,
default=[],
help="Filters in the format source_filter:target_filter",
)
optional_arguments.add_argument(
"--threshold",
"-th",
type=threshold_float,
default=0.0,
help="Float max threshold for percent difference",
)
# Group required arguments
required_arguments = custom_query_column_parser.add_argument_group(
"required arguments"
)
# Group for mutually exclusive source query arguments. Either must be supplied
source_mutually_exclusive = required_arguments.add_mutually_exclusive_group(
required=True
)
source_mutually_exclusive.add_argument(
"--source-query-file",
"-sqf",
help="File containing the source sql query",
)
source_mutually_exclusive.add_argument(
"--source-query",
"-sq",
help="Source sql query",
)
# Group for mutually exclusive target query arguments. Either must be supplied
target_mutually_exclusive = required_arguments.add_mutually_exclusive_group(
required=True
)
target_mutually_exclusive.add_argument(
"--target-query-file",
"-tqf",
help="File containing the target sql query",
)
target_mutually_exclusive.add_argument(
"--target-query",
"-tq",
help="Target sql query",
)
_add_common_arguments(optional_arguments, required_arguments)
def _add_common_arguments(
optional_arguments, required_arguments, is_generate_partitions=False
):
# Group all Required Arguments together
required_arguments.add_argument(
"--source-conn", "-sc", required=True, help="Source connection name"
)
required_arguments.add_argument(
"--target-conn", "-tc", required=True, help="Target connection name"
)
# Optional arguments
optional_arguments.add_argument(
"--bq-result-handler", "-bqrh", help=argparse.SUPPRESS
)
optional_arguments.add_argument(
"--result-handler",
"-rh",
help=(
"Result handler connection details. "
"CONNECTION_NAME.SCHEMA.TABLE or BQ_PROJECT_ID.DATASET.TABLE."
),
)
optional_arguments.add_argument(
"--labels", "-l", help="Key value pair labels for validation run"
)
optional_arguments.add_argument(
"--service-account",
"-sa",
help="Path to SA key file for result handler output",
)
if not is_generate_partitions:
optional_arguments.add_argument(
"--config-file",
"-c",
help="Store the validation config in the YAML File Path specified",
)
optional_arguments.add_argument(
"--config-file-json",
"-cj",
help="Store the validation config in the JSON File Path specified to be used for application use cases",
)
optional_arguments.add_argument(
"--format",
"-fmt",
default=consts.FORMAT_TYPE_TABLE,
help="Set the format for printing command output, Supported formats are (text, csv, json, table). Defaults "
"to table",
)
optional_arguments.add_argument(
"--filter-status",
"-fs",
# TODO: update if we start to support other statuses
help="Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned",
)
optional_arguments.add_argument(
"--run-id",
"-rid",
default=None,
help="Set a string for the run_id, if None is input then a randomly generated UUID will be used, which is the default behaviour",
)
def _check_positive(value: int, lower_bound: int = 1) -> int:
ivalue = int(value)
if ivalue < lower_bound:
if lower_bound == 1:
raise argparse.ArgumentTypeError(
f"{value} is an invalid positive int value"
)
else:
raise argparse.ArgumentTypeError(f"Value must be >= {lower_bound}: {value}")
return ivalue
def _check_gt_one(value: int) -> int:
return _check_positive(value, lower_bound=2)
def check_no_yaml_files(partition_num: int, parts_per_file: int):
"""Check that number of yaml files generated is less than 10,001
Will be invoked after all the arguments are processed."""
if math.ceil(partition_num / parts_per_file) < 10001:
return
else:
raise argparse.ArgumentTypeError(
f"partition-num={partition_num} results in more than the maximum number of yaml files (i.e. 10,000). Reduce the number of yaml files by using the --parts-per-file argument or decreasing the number of partitions."
)
def get_connection_config_from_args(args):
"""Return dict with connection config supplied."""
config = {
consts.SOURCE_TYPE: args.connect_type,
consts.SECRET_MANAGER_TYPE: getattr(args, consts.SECRET_MANAGER_TYPE),
consts.SECRET_MANAGER_PROJECT_ID: getattr(
args, consts.SECRET_MANAGER_PROJECT_ID
),
}
if args.connect_type == "Raw":
return json.loads(args.json)
for field_obj in CONNECTION_SOURCE_FIELDS[args.connect_type]:
field = field_obj[0]
if getattr(args, field) is None:
continue
config[field] = getattr(args, field)
return config
def threshold_float(x):
"""Restrict threshold arg to be a positive float."""
try:
x = float(x)
except ValueError:
raise argparse.ArgumentTypeError("%r not a floating-point literal" % (x,))
if x < 0.0 or x > sys.float_info.max:
raise argparse.ArgumentTypeError(
"%r must be positive and below the max float value" % (x,)
)
elif x != x:
raise argparse.ArgumentTypeError("%r must be a number" % (x,))
return x
def _generate_random_name(conn):
name = f"{conn[consts.SOURCE_TYPE]}_{str(uuid.uuid4())}"
return name
def store_connection(connection_name, conn):
"""Store the connection config under the given name."""
mgr = state_manager.StateManager()
mgr.create_connection(connection_name, conn)
def delete_connection(connection_name):
"""Delete the connection config under the given name."""
mgr = state_manager.StateManager()
mgr.delete_connection(connection_name)
def list_connections():
"""List all saved connections."""
mgr = state_manager.StateManager()
connections = mgr.list_connections()
for conn_name in connections:
source_type = mgr.get_connection_config(conn_name).get("source_type")
print(f"Connection Name: {conn_name}")
print(f"Source Type: {source_type}\n")
return connections
def describe_connection(connection_name, output_format):
"""Return yaml connection details for a specific connection"""
mgr = state_manager.StateManager()
connection_details = mgr.describe_connection(connection_name, output_format)
print(connection_details)
return connection_details
def get_connection(connection_name):
"""Return dict connection details for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_connection_config(connection_name)
def store_validation(validation_file_name, config, include_log=True):
"""Store the validation config under the given name."""
validation_path = gcs_helper.get_validation_path(validation_file_name)
if validation_file_name.endswith(".yaml"):
config_str = dump(config, Dumper=Dumper)
elif validation_file_name.endswith("json"):
config_str = json.dumps(config)
else:
raise ValueError(f"Invalid validation file name: {validation_file_name}")
gcs_helper.write_file(validation_path, config_str, include_log=include_log)
def get_validation(name: str, config_dir: str = None):
"""Return validation YAML config."""
if config_dir:
validation_path = os.path.join(config_dir, name)
else:
validation_path = gcs_helper.get_validation_path(name)
validation_bytes = gcs_helper.read_file(validation_path)
return load(validation_bytes, Loader=Loader)
def list_validations(config_dir="./"):
"""List all saved validation YAMLs in a directory."""
logging.info(f"Looking for validations in path {config_dir}")
if gcs_helper._is_gcs_path(config_dir):
if not config_dir.endswith("/"):
config_dir += "/"
files = gcs_helper.list_gcs_directory(config_dir)
else:
files = os.listdir(config_dir)
return [file_name for file_name in files if file_name.endswith(".yaml")]
def print_validations_in_dir(config_dir="./"):
validations = list_validations(config_dir=config_dir)
logging.info("Validation YAMLs found:")
for validation_name in validations:
logging.info(validation_name)
def get_labels(arg_labels):
"""Return list of tuples representing key-value label pairs."""
labels = []
if arg_labels:
pairs = arg_labels.split(",")
for pair in pairs:
kv = pair.split("=")
if len(kv) == 2:
labels.append((kv[0], kv[1]))
else:
raise ValueError("Labels must be comma-separated key-value pairs.")
return labels
def get_filters(filter_value: str) -> List[Dict]:
"""Returns filters for source and target from --filters argument.
A filter is the condition that is used in a SQL WHERE clause.
If only one filter is specified, it applies to both source and target
For a doc on regular expression for filters see docs/internal/filters_regex.md
"""
filters = util.split_not_in_quotes(filter_value, ":")
if len(filters) not in (1, 2):
raise argparse.ArgumentTypeError("Unable to parse filter arguments.")
filters = [_.strip() for _ in filters]
if len(filters) == 1:
if not filters[0]:
raise argparse.ArgumentTypeError("Empty string not allowed in filter")
filter_dict = {
"type": "custom",
"source": filters[0],
"target": filters[0],
}
elif len(filters) == 2:
if not filters[0] or not filters[1]:
raise argparse.ArgumentTypeError("Empty string not allowed in filter")
filter_dict = {
"type": "custom",
"source": filters[0],
"target": filters[1],
}
filter_config = [
filter_dict,
]
return filter_config
def _get_result_handler(rc_value: str, sa_file=None) -> dict:
"""Returns dict of result handler config. Backwards compatible for JSON input.
rc_value (str): Result config argument specified.
sa_file (str): SA path argument specified.
"""
config = rc_value.split(".", 1)
if len(config) != 2:
raise ValueError(f"Unable to parse result handler config: `{rc_value}`")
# Check if the first part of the result handler is a connection name.
mgr = state_manager.StateManager()
connections = mgr.list_connections()
if config[0] in connections:
# We received connection_name.results_table.
conn_from_file = get_connection(config[0])
if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.PROJECT_ID: conn_from_file["project_id"],
consts.TABLE_ID: config[1],
consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
}
elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.TABLE_ID: config[1],
consts.RH_CONN: conn_from_file,
}
# TODO Add filesytem handler too.
else:
raise exceptions.ResultHandlerException(
f"Unsupported result handler connection type: {conn_from_file[consts.SOURCE_TYPE]}"
)
else:
# We received legacy format "bq-project-name.bq_results_table".
result_handler = {
consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY,
consts.PROJECT_ID: config[0],
consts.TABLE_ID: config[1],
}
if sa_file:
result_handler[consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH] = sa_file
return result_handler
def get_arg_list(arg_value, default_value=None):
"""Returns list of values from argument provided. Backwards compatible for JSON input.
arg_value (str): Argument supplied
default_value (Any): A default value to supply when arg_value is empty.
"""
if not arg_value:
return default_value
return _read_json_value(arg_value) or arg_value.split(",")
def _read_json_value(arg_value: str) -> list:
"""Returns a deserialized JSON value or None if an error occurs."""
try:
if isinstance(arg_value, list):
arg_value = str(arg_value)
return json.loads(arg_value)
except json.decoder.JSONDecodeError:
return None
def get_tables_list(arg_tables, default_value=None, is_filesystem=False):
"""Returns dictionary of tables. Backwards compatible for JSON input.
arg_table (str): tables_list argument specified
default_value (Any): A default value to supply when arg_value is empty.
is_filesystem (boolean): Boolean indicating whether source connection is a FileSystem. In this case, a schema is not required.
"""
if not arg_tables:
return default_value
json_tables_list = _read_json_value(arg_tables)
if json_tables_list:
return json_tables_list
tables_list = []
tables_mapping = list(csv.reader([arg_tables]))[0]
source_schema_required = bool(not is_filesystem)
for mapping in tables_mapping:
tables_map = mapping.split("=")
if len(tables_map) == 1:
schema, table = split_table(
tables_map, schema_required=source_schema_required
)
table_dict = {
"schema_name": schema,
"table_name": table,
}
elif len(tables_map) == 2:
src_schema, src_table = split_table(
[tables_map[0]], schema_required=source_schema_required
)
table_dict = {
"schema_name": src_schema,
"table_name": src_table,
}
targ_schema, targ_table = split_table(
[tables_map[1]], schema_required=False
)
if targ_schema:
table_dict["target_schema_name"] = targ_schema
table_dict["target_table_name"] = targ_table
else:
raise ValueError(
"Unable to parse tables list. Please provide valid mapping."
)
tables_list.append(table_dict)
return tables_list
def split_table(table_ref, schema_required=True):
"""Returns schema and table name given list of input values.
table_ref (List): Table reference i.e ['my.schema.my_table']
schema_required (boolean): Indicates whether schema is required. A source
table reference requires schema. A target table reference does not.
"""
table_ref_list = list(csv.reader(table_ref, delimiter=".", quotechar='"'))[0]
if len(table_ref_list) == 1 and schema_required:
raise ValueError("Please provide schema in tables list.")
elif len(table_ref_list) == 1:
return None, table_ref_list[0].strip()
table = table_ref_list.pop()
schema = ".".join(table_ref_list)
return schema.strip(), table.strip()
def get_query_from_file(filename):
"""Return query from input file"""
query = ""
try:
query = gcs_helper.read_file(filename, download_as_text=True)
query = query.rstrip(";\n")
except IOError:
logging.error("Cannot read query file: ", filename)
if not query or query.isspace():
raise ValueError(
"Expected file with sql query, got empty file or file with white spaces. "
f"input file: {filename}"
)
return query
def get_query_from_inline(inline_query):
"""Return query from inline query arg"""
query = inline_query.strip()
query = query.rstrip(";\n")
if not query or query.isspace():
raise ValueError(
"Expected arg with sql query, got empty arg or arg with white "
f"spaces. input query: '{inline_query}'"
)
return query
def get_query_from_query_args(query_str_arg, query_file_arg) -> str:
if query_str_arg:
return get_query_from_inline(query_str_arg)
else:
return get_query_from_file(query_file_arg)
def _max_concat_columns(
max_concat_columns_option: int, source_client, target_client
) -> Optional[int]:
"""Determine any upper limit on number of columns allowed into concat() operation."""
if max_concat_columns_option:
# User specified limit takes precedence.
return max_concat_columns_option
else:
source_max = consts.MAX_CONCAT_COLUMNS_DEFAULTS.get(source_client.name, None)
target_max = consts.MAX_CONCAT_COLUMNS_DEFAULTS.get(target_client.name, None)
if source_max and target_max:
return min(source_max, target_max)
else:
return source_max or target_max
def _concat_column_count_configs(
cols: list,
pre_build_configs: dict,
arg_to_override: str,
max_col_count: int,
) -> list:
"""
Ensure we don't have too many columns for the engines involved.
https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/1216
"""
return_list = []
if max_col_count and len(cols) > max_col_count:
for col_chunk in list_to_sublists(cols, max_col_count):
col_csv = ",".join(col_chunk)
pre_build_configs_copy = copy.copy(pre_build_configs)
pre_build_configs_copy[arg_to_override] = col_csv
return_list.append(pre_build_configs_copy)
else:
return_list.append(pre_build_configs)
return return_list
def get_pre_build_configs(args: "Namespace", validate_cmd: str) -> List[Dict]:
"""Return a dict of configurations to build ConfigManager object"""
def cols_from_arg(concat_arg: str, client, table_obj: dict, query_str: str) -> list:
if concat_arg == "*":
# If validating with "*" then we need to expand to count the columns.
if table_obj:
return clients.get_ibis_table_schema(
client,
table_obj["schema_name"],
table_obj["table_name"],
).names
else:
return clients.get_ibis_query_schema(
client,
query_str,
).names
else:
return get_arg_list(concat_arg)
# validate_cmd will be set to 'row`, or 'Custom-query' if invoked by generate-table-partitions depending
# on what is being partitioned. Otherwise validate_cmd will be set to None
if validate_cmd is None:
validate_cmd = args.validate_cmd.capitalize()
if validate_cmd == "Schema":
config_type = consts.SCHEMA_VALIDATION
elif validate_cmd == "Column":
config_type = consts.COLUMN_VALIDATION
elif validate_cmd == "Row":
config_type = consts.ROW_VALIDATION
elif validate_cmd == "Custom-query":
config_type = consts.CUSTOM_QUERY
else:
raise ValueError(f"Unknown Validation Type: {validate_cmd}")
# Cater for legacy -bqrh.
args.result_handler = args.result_handler or args.bq_result_handler
# Get result handler config
if args.result_handler:
result_handler_config = _get_result_handler(
args.result_handler, args.service_account
)
else:
result_handler_config = None
# Set filter_config and threshold. Not supported in case of schema validation
filter_config = getattr(args, consts.CONFIG_FILTERS, [])
threshold = getattr(args, consts.CONFIG_THRESHOLD, 0.0)
# Get labels
if args.labels is None:
labels = []
else:
labels = get_labels(args.labels)
# Get source and target clients
mgr = state_manager.StateManager()
source_client = clients.get_data_client(mgr.get_connection_config(args.source_conn))
target_client = clients.get_data_client(mgr.get_connection_config(args.target_conn))
# Get format: text, csv, json, table. Default is table
format = args.format if args.format else consts.FORMAT_TYPE_TABLE
# Get random row arguments. Only in row validations these attributes can be present.
# Bad coding here, but keeping it so as not to introduce a breaking change. See
# consts.py Line 17 for a more detailed explanation.
use_random_rows = getattr(args, "use_random_row", False)
random_row_batch_size = getattr(args, consts.CONFIG_RANDOM_ROW_BATCH_SIZE, None)
# Get table list. Not supported in case of custom query validation
is_filesystem = source_client._source_type == "FileSystem"
query_str = None
if config_type == consts.CUSTOM_QUERY:
tables_list = get_tables_list(
None, default_value=[{}], is_filesystem=is_filesystem
)
query_str = get_query_from_query_args(args.source_query, args.source_query_file)
else:
tables_list = get_tables_list(
args.tables_list, default_value=[{}], is_filesystem=is_filesystem
)
# Get validation filter status: success, fail
if args.filter_status is not None:
arg_list = get_arg_list(args.filter_status)
if all(arg in consts.VALIDATION_STATUSES for arg in arg_list):
filter_status = arg_list
else:
raise ValueError("An unsupported status was provided")
else:
filter_status = None
pre_build_configs_list = []
if config_type != consts.CUSTOM_QUERY:
tables_list = find_tables.expand_tables_of_asterisk(
tables_list, source_client, target_client
)
for table_obj in tables_list:
pre_build_configs = {
"config_type": config_type,
consts.CONFIG_SOURCE_CONN_NAME: args.source_conn,
consts.CONFIG_TARGET_CONN_NAME: args.target_conn,
"table_obj": table_obj,
consts.CONFIG_LABELS: labels,
consts.CONFIG_THRESHOLD: threshold,
consts.CONFIG_FORMAT: format,
consts.CONFIG_USE_RANDOM_ROWS: use_random_rows,
consts.CONFIG_RANDOM_ROW_BATCH_SIZE: random_row_batch_size,
"source_client": source_client,
"target_client": target_client,
"result_handler_config": result_handler_config,
"filter_config": filter_config,
consts.CONFIG_FILTER_STATUS: filter_status,
consts.CONFIG_TRIM_STRING_PKS: getattr(
args, consts.CONFIG_TRIM_STRING_PKS, False
),
consts.CONFIG_CASE_INSENSITIVE_MATCH: getattr(
args, consts.CONFIG_CASE_INSENSITIVE_MATCH, False
),
consts.CONFIG_ROW_CONCAT: getattr(args, consts.CONFIG_ROW_CONCAT, None),
consts.CONFIG_ROW_HASH: getattr(args, consts.CONFIG_ROW_HASH, None),
consts.CONFIG_RUN_ID: getattr(args, consts.CONFIG_RUN_ID, None),
"verbose": args.verbose,
}
if (
pre_build_configs[consts.CONFIG_ROW_CONCAT]
or pre_build_configs[consts.CONFIG_ROW_HASH]
):
# Ensure we don't have too many columns for the engines involved.
cols = cols_from_arg(
pre_build_configs[consts.CONFIG_ROW_HASH]
or pre_build_configs[consts.CONFIG_ROW_CONCAT],
source_client,
table_obj,
query_str,
)
new_pre_build_configs = _concat_column_count_configs(
cols,
pre_build_configs,
consts.CONFIG_ROW_HASH if args.hash else consts.CONFIG_ROW_CONCAT,
_max_concat_columns(
args.max_concat_columns, source_client, target_client
),
)
if len(new_pre_build_configs) > 1:
message_type = (
f'{table_obj["schema_name"]}.{table_obj["table_name"]}'
if table_obj
else "custom query"
)
logging.info(
f"Splitting validation into {len(new_pre_build_configs)} queries for {message_type}"
)
pre_build_configs_list.extend(new_pre_build_configs)
else:
pre_build_configs_list.append(pre_build_configs)
return pre_build_configs_list