data_validation/cli_tools.py (1,307 lines of code) (raw):

# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """The Data Validation CLI tool is intended to help to build and execute data validation runs with ease. The Data Validator can be called either using: data-validation -h python -m data_validation -h ex. Step 1) Store Connection to be used in validation data-validation connections add -c my_bq_conn BigQuery --project-id pso-project Step 2) Run Validation using supplied connections data-validation validate column -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \ --sum '*' --count '*' python -m data_validation validate column -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips \ --grouped-columns starttime \ --sum tripduration --count tripduration data-validation validate column \ -sc my_bq_conn -tc my_bq_conn \ -tbls bigquery-public-data.new_york_citibike.citibike_trips,bigquery-public-data.new_york_citibike.citibike_stations \ --sum tripduration,start_station_name --count tripduration,start_station_name \ -rh pso-project.pso_data_validator.results \ -c ex_yaml.yaml command: data-validation """ import argparse import copy import csv import json import logging import sys import uuid import os import math from typing import Dict, List, Optional, TYPE_CHECKING from yaml import Dumper, Loader, dump, load from data_validation import ( clients, consts, exceptions, find_tables, state_manager, gcs_helper, util, ) from data_validation.validation_builder import list_to_sublists if TYPE_CHECKING: from argparse import Namespace CONNECTION_SOURCE_FIELDS = { consts.SOURCE_TYPE_BIGQUERY: [ ["project_id", "GCP Project to use for BigQuery"], ["google_service_account_key_path", "(Optional) GCP SA Key Path"], [ "api_endpoint", '(Optional) GCP BigQuery API endpoint (e.g. "https://mybq.p.googleapis.com")', ], ], consts.SOURCE_TYPE_TERADATA: [ ["host", "Desired Teradata host"], ["port", "Teradata port to connect on"], ["user_name", "User used to connect"], ["password", "Password for supplied user"], ["logmech", "(Optional) Log on mechanism"], ["use_no_lock_tables", "Use an access lock for queries (defaults to False)"], ["json_params", "(Optional) Additional teradatasql JSON string parameters"], ], consts.SOURCE_TYPE_ORACLE: [ ["host", "Desired Oracle host"], ["port", "Oracle port to connect on"], ["user", "User used to connect"], ["password", "Password for supplied user"], ["database", "Database to connect to"], ["url", "Oracle SQLAlchemy connection URL"], ], consts.SOURCE_TYPE_MSSQL: [ ["host", "Desired SQL Server host (default localhost)"], ["port", "SQL Server port to connect on (default 1433)"], ["user", "User used to connect"], ["password", "Password for supplied user"], ["database", "Database to connect to (default master)"], ["query", "Connection query parameters"], ["url", "SQL Server SQLAlchemy connection URL"], ], consts.SOURCE_TYPE_MYSQL: [ ["host", "Desired MySQL host (default localhost)"], ["port", "MySQL port to connect on (default 3306)"], ["user", "User used to connect"], ["password", "Password for supplied user"], ["database", "Database to connect to (default master)"], ], consts.SOURCE_TYPE_SNOWFLAKE: [ ["user", "Username to connect to"], ["password", "Password for authentication of user"], ["account", "Snowflake account to connect to"], ["database", "Database in snowflake to connect to"], ["connect_args", "(Optional) Additional connection arg mapping"], ], consts.SOURCE_TYPE_POSTGRES: [ ["host", "Desired PostgreSQL host."], ["port", "PostgreSQL port to connect on (e.g. 5432)"], ["user", "Username to connect to"], ["password", "Password for authentication of user"], ["database", "Database in PostgreSQL to connect to (default postgres)"], ], consts.SOURCE_TYPE_REDSHIFT: [ ["host", "Desired Redshift host."], ["port", "Redshift port to connect on (e.g. 5439)"], ["user", "Username to connect to"], ["password", "Password for authentication of user"], ["database", "Database in Redshift to connect to"], ], consts.SOURCE_TYPE_SPANNER: [ ["project_id", "GCP Project to use for Spanner"], ["instance_id", "ID of Spanner instance to connect to"], ["database_id", "ID of Spanner database (schema) to connect to"], ["google_service_account_key_path", "(Optional) GCP SA Key Path"], [ "api_endpoint", '(Optional) GCP Spanner API endpoint (e.g. "https://mycs.p.googleapis.com")', ], ], consts.SOURCE_TYPE_FILESYSTEM: [ ["table_name", "Table name to use as reference for file data"], ["file_path", "The local, s3, or GCS file path to the data"], ["file_type", "The file type of the file. 'csv', 'orc', 'parquet' or 'json'"], ], consts.SOURCE_TYPE_IMPALA: [ ["host", "Desired Impala host"], ["port", "Desired Impala port (10000 if not provided)"], ["database", "Desired Impala database (default if not provided)"], ["auth_mechanism", "Desired Impala auth mechanism (PLAIN if not provided)"], [ "kerberos_service_name", "Desired Kerberos service name ('impala' if not provided)", ], ["use_ssl", "Use SSL when connecting to HiveServer2 (default is False)"], [ "timeout", "Connection timeout in seconds when communicating with HiveServer2 (default is 45)", ], [ "ca_cert", "Local path to 3rd party CA certificate or copy of server certificate for self-signed certificates. If SSL is enabled, but this argument is None, then certificate validation is skipped.", ], ["user", "LDAP user to authenticate"], ["password", "LDAP password to authenticate"], [ "pool_size", "Size of the connection pool. Typically this is not necessary to configure. (default is 8)", ], ["hdfs_client", "An existing HDFS client"], ["use_http_transport", "Boolean if HTTP proxy is provided (default is False)"], ["http_path", "URL path of HTTP proxy"], ], consts.SOURCE_TYPE_DB2: [ ["host", "Desired DB2 host"], ["port", "Desired DB2 port (50000 if not provided)"], ["user", "Username to connect to"], ["password", "Password for authentication of user"], ["database", "Database in DB2 to connect to"], ["url", "URL link in DB2 to connect to"], ["driver", "Driver link in DB2 to connect to (default ibm_db_sa)"], ], } VALIDATE_HELP_TEXT = "Run a validation and optionally store to config" VALIDATE_COLUMN_HELP_TEXT = "Run a column validation" VALIDATE_ROW_HELP_TEXT = "Run a row validation" VALIDATE_SCHEMA_HELP_TEXT = "Run a schema validation" VALIDATE_CUSTOM_QUERY_HELP_TEXT = "Run a custom query validation" def _check_custom_query_args(parser: argparse.ArgumentParser, parsed_args: "Namespace"): # This is where we make additional checks if the arguments provided are what we expect # For example, only one of -tbls and custom query options can be provided if hasattr(parsed_args, "tables_list") and hasattr( parsed_args, "source_query" ): # New Format if ( parsed_args.tables_list ): # Tables_list is not None - so source and target queries all must be None if ( parsed_args.source_query_file or parsed_args.source_query or parsed_args.target_query_file or parsed_args.target_query ): parser.error( f"{parsed_args.command}: when --tables-list/-tbls is specified, --source-query-file/-sqf, --source-query/-sq, --target-query-file/-tqf and --target-query/-tq must not be specified" ) else: return elif (parsed_args.source_query_file or parsed_args.source_query) and ( parsed_args.target_query_file or parsed_args.target_query ): return else: parser.error( f"{parsed_args.command}: Must specify both source (--source-query-file/-sqf or --source-query/-sq) and target (--target-query-file/-tqf or --target-query/-tq) - when --tables-list/-tbls is not specified" ) else: return # old format - only one of them is present def get_parsed_args() -> "Namespace": """Return ArgParser with configured CLI arguments.""" parser = configure_arg_parser() args = ["--help"] if len(sys.argv) == 1 else None parsed_args = parser.parse_args(args) _check_custom_query_args(parser, parsed_args) return parsed_args def configure_arg_parser(): """Extract Args for Run.""" parser = argparse.ArgumentParser( usage=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") parser.add_argument( "--log-level", "-ll", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Log Level to be assigned. This will print logs with level same or above", ) subparsers = parser.add_subparsers(dest="command") _configure_validate_parser(subparsers) _configure_validation_config_parser(subparsers) _configure_connection_parser(subparsers) _configure_find_tables(subparsers) _configure_raw_query(subparsers) _configure_beta_parser(subparsers) _configure_partition_parser(subparsers) return parser def _configure_partition_parser(subparsers): """Configure arguments to generate partitioned config files.""" partition_parser = subparsers.add_parser( "generate-table-partitions", help=("Generate table partitions and store validation config files"), ) optional_arguments = partition_parser.add_argument_group("optional arguments") required_arguments = partition_parser.add_argument_group("required arguments") _configure_row_parser( partition_parser, optional_arguments, required_arguments, is_generate_partitions=True, ) optional_arguments.add_argument( "--parts-per-file", "-ppf", type=_check_positive, default=1, help="Number of partitions to be validated in a single yaml file.", ) required_arguments.add_argument( "--config-dir", "-cdir", required=True, help="Directory path to store YAML config files. " "GCS: Provide a full gs:// path of the target directory. " "Eg: `gs://<BUCKET>/partitons_dir`. " "Local: Provide a relative path of the target directory. " "Eg: `partitions_dir`", ) required_arguments.add_argument( "--partition-num", "-pn", required=True, help="Number of partitions into which the table should be split", type=_check_gt_one, ) # User can provide tables or custom queries, but not both # However, Argparse does not support adding an argument_group to an argument_group or adding a # mutually_exclusive_group or argument_group to a mutually_exclusive_group since version 3.11. # We are only ensuring leaf level mutual exclusivity here and will need to check higher level # mutual exclusivity in the code - i.e. a) when --tables-list is present, there can be no custom # query parameters and b) when custom query parameters are specified, both source and target must be # specified. optional_arguments.add_argument( "--tables-list", "-tbls", help=( "Comma separated tables list in the form " "'schema.table=target_schema.target_table'" ), ) source_mutually_exclusive = optional_arguments.add_mutually_exclusive_group() source_mutually_exclusive.add_argument( "--source-query-file", "-sqf", help="File containing the source sql query", ) source_mutually_exclusive.add_argument( "--source-query", "-sq", help="Source sql query", ) # Group for mutually exclusive target query arguments. Either must be supplied target_mutually_exclusive = optional_arguments.add_mutually_exclusive_group() target_mutually_exclusive.add_argument( "--target-query-file", "-tqf", help="File containing the target sql query", ) target_mutually_exclusive.add_argument( "--target-query", "-tq", help="Target sql query", ) def _configure_beta_parser(subparsers): """Configure beta commands for the parser.""" connection_parser = subparsers.add_parser( "beta", help="Run a Beta command for new utilities and features." ) beta_subparsers = connection_parser.add_subparsers(dest="beta_cmd") _configure_validate_parser(beta_subparsers) _configure_deploy(beta_subparsers) def _configure_deploy(subparsers): """Configure arguments for deploying as a service.""" subparsers.add_parser( "deploy", help="Deploy Data Validation as a Service (w/ Flask)" ) def _configure_find_tables(subparsers): """Configure arguments for text search table matching.""" find_tables_parser = subparsers.add_parser( "find-tables", help="Build tables list using approx string matching." ) find_tables_parser.add_argument( "--source-conn", "-sc", help="Source connection name." ) find_tables_parser.add_argument( "--target-conn", "-tc", help="Target connection name." ) find_tables_parser.add_argument( "--allowed-schemas", "-as", help="List of source schemas to match." ) find_tables_parser.add_argument( "--include-views", "-iv", default=False, action="store_true", help="Include views in results.", ) find_tables_parser.add_argument( "--score-cutoff", "-score", type=float, help="The minimum distance score allowed to match tables (0 to 1).", ) def _configure_raw_query(subparsers): """Configure arguments for text search table matching.""" query_parser = subparsers.add_parser( "query", help="Run an adhoc query against the supplied connection" ) query_parser.add_argument("--conn", "-c", help="Connection name to query") query_parser.add_argument("--query", "-q", help="Raw query to execute") query_parser.add_argument( "--format", "-f", dest="output_format", choices=consts.RAW_QUERY_FORMAT_TYPES, default=consts.FORMAT_TYPE_PYTHON, help=f"Format for query output (default: {consts.FORMAT_TYPE_PYTHON})", ) def _configure_validation_config_parser(subparsers): """Configure arguments to run a data validation YAML config file.""" validation_config_parser = subparsers.add_parser( "configs", help="Run validations stored in a YAML config file" ) configs_subparsers = validation_config_parser.add_subparsers( dest="validation_config_cmd" ) list_parser = configs_subparsers.add_parser( "list", help="List your validation configs" ) list_parser.add_argument( "--config-dir", "-cdir", help="Directory path from which to list validation YAML configs.", ) run_parser = configs_subparsers.add_parser( "run", help="Run your validation configs" ) run_parser.add_argument( "--dry-run", "-dr", action="store_true", help="Prints source and target SQL to stdout in lieu of performing a validation.", ) run_parser.add_argument( "--config-file", "-c", help="YAML Config File path to be used for building or running validations.", ) run_parser.add_argument( "--config-dir", "-cdir", help="Directory path containing YAML Config Files to be used for running validations.", ) run_parser.add_argument( "--kube-completions", "-kc", action="store_true", help="When validating multiple table partitions generated by generate-table-partitions, using DVT in Kubernetes in index completion mode use this flag so that all the validations are completed", ) get_parser = configs_subparsers.add_parser( "get", help="Get and print a validation config" ) get_parser.add_argument( "--config-file", "-c", help="YAML Config File Path to be used for building or running validations.", ) def _configure_connection_parser(subparsers): """Configure the Parser for Connection Management.""" connection_parser = subparsers.add_parser( "connections", help="Manage & Store connections to your Databases" ) connect_subparsers = connection_parser.add_subparsers(dest="connect_cmd") _ = connect_subparsers.add_parser("list", help="List your connections") add_parser = connect_subparsers.add_parser("add", help="Store a new connection") add_parser.add_argument( "--connection-name", "-c", help="Name of connection used as reference" ) add_parser.add_argument( "--secret-manager-type", "-sm", default=None, help="Secret manager type to store credentials by default will be None ", ) add_parser.add_argument( "--secret-manager-project-id", "-sm-prj-id", default=None, help="Project ID for the secret manager that stores the credentials", ) _configure_database_specific_parsers(add_parser) delete_parser = connect_subparsers.add_parser( "delete", help="Delete an existing connection" ) delete_parser.add_argument( "--connection-name", "-c", required=True, help="Name of connection to delete" ) describe_parser = connect_subparsers.add_parser( "describe", help="Describe an existing connection" ) describe_parser.add_argument( "--connection-name", "-c", required=True, help="Name of connection to describe" ) describe_parser.add_argument( "--format", "-f", dest="output_format", choices=["json", "yaml"], default="yaml", help="Output format for the configuration (default: yaml)", ) def _configure_database_specific_parsers(parser): """Configure a separate subparser for each supported DB.""" subparsers = parser.add_subparsers(dest="connect_type") raw_parser = subparsers.add_parser( "Raw", help="Supply Raw JSON config for a connection" ) raw_parser.add_argument("--json", "-j", help="Json string config") for database in CONNECTION_SOURCE_FIELDS: article = "an" if database[0].lower() in "aeiou" else "a" db_parser = subparsers.add_parser( database, help=f"Store {article} {database} connection" ) for field_obj in CONNECTION_SOURCE_FIELDS[database]: arg_field = "--" + field_obj[0].replace("_", "-") help_txt = field_obj[1] db_parser.add_argument(arg_field, help=help_txt) def _configure_validate_parser(subparsers): """Configure arguments to run validations.""" validate_parser = subparsers.add_parser("validate", help=VALIDATE_HELP_TEXT) validate_parser.add_argument( "--dry-run", "-dr", action="store_true", help="Prints source and target SQL to stdout in lieu of performing a validation.", ) validate_subparsers = validate_parser.add_subparsers(dest="validate_cmd") column_parser = validate_subparsers.add_parser( "column", help=VALIDATE_COLUMN_HELP_TEXT ) _configure_column_parser(column_parser) row_parser = validate_subparsers.add_parser("row", help=VALIDATE_ROW_HELP_TEXT) optional_arguments = row_parser.add_argument_group("optional arguments") required_arguments = row_parser.add_argument_group("required arguments") _configure_row_parser(row_parser, optional_arguments, required_arguments) schema_parser = validate_subparsers.add_parser( "schema", help=VALIDATE_SCHEMA_HELP_TEXT ) _configure_schema_parser(schema_parser) custom_query_parser = validate_subparsers.add_parser( "custom-query", help=VALIDATE_CUSTOM_QUERY_HELP_TEXT ) _configure_custom_query_parser(custom_query_parser) def _configure_row_parser( parser, optional_arguments, required_arguments, is_generate_partitions=False, is_custom_query=False, ): """Configure arguments to run row level validations.""" # Group optional arguments optional_arguments.add_argument( "--primary-keys", "-pk", help=( "Comma separated list of primary key columns 'col_a,col_b', " "when not specified the value will be inferred from the source or target table if available" ), ) optional_arguments.add_argument( "--threshold", "-th", type=threshold_float, default=0.0, help="Float max threshold for percent difference", ) optional_arguments.add_argument( "--exclude-columns", "-ec", action="store_true", help="Flag to indicate the list of columns should be excluded from hash or concat instead of included.", ) optional_arguments.add_argument( "--filters", "-filters", type=get_filters, default=[], help="Filters in the format source_filter:target_filter", ) optional_arguments.add_argument( "--trim-string-pks", "-tsp", action="store_true", help=( "Trims string based primary key values, intended for use when one engine uses " "padded string semantics (e.g. CHAR(n)) and the other does not (e.g. VARCHAR(n))." ), ) optional_arguments.add_argument( "--case-insensitive-match", "-cim", action="store_true", help=( "Performs a case insensitive match by adding an UPPER() before comparison." ), ) optional_arguments.add_argument( "--max-concat-columns", "-mcc", type=int, help=( "The maximum number of columns accepted by a --hash or --concat validation. When there are " "more columns than this the validation will implicitly be split into multiple validations. " "This option has engine specific defaults." ), ) # Generate-table-partitions and custom-query does not support random row if not (is_generate_partitions or is_custom_query): optional_arguments.add_argument( "--use-random-row", "-rr", action="store_true", help="Finds a set of random rows of the first primary key supplied.", ) optional_arguments.add_argument( "--random-row-batch-size", "-rbs", help="Row batch size used for random row filters (default 10,000).", ) # Generate table partitions follows a new argument spec where either the table names or queries can be provided, but not both. # that is specified in configure_partition_parser. If we use the same spec for row and column validation, the custom query commands # may get subsumed by validate and validate commands by specifying tables name or queries. Until this -tbls will be # a required argument for validate row, validate column and validate schema. required_arguments.add_argument( "--tables-list", "-tbls", default=None, required=True, help="Comma separated tables list in the form 'schema.table=target_schema.target_table'", ) # Group for mutually exclusive required arguments. Either must be supplied mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group( required=True ) mutually_exclusive_arguments.add_argument( "--hash", "-hash", help=( "Comma separated list of columns for hash 'col_a,col_b' or * for " "all columns" ), ) mutually_exclusive_arguments.add_argument( "--concat", "-concat", help=( "Comma separated list of columns for concat 'col_a,col_b' or * " "for all columns" ), ) mutually_exclusive_arguments.add_argument( "--comparison-fields", "-comp-fields", help=( "Individual columns to compare. If comparing a calculated field use " "the column alias." ), ) _add_common_arguments( optional_arguments, required_arguments, is_generate_partitions=is_generate_partitions, ) def _configure_column_parser(column_parser): """Configure arguments to run column level validations.""" # Group optional arguments optional_arguments = column_parser.add_argument_group("optional arguments") optional_arguments.add_argument( "--count", "-count", help="Comma separated list of columns for count 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--sum", "-sum", help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--avg", "-avg", help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--min", "-min", help="Comma separated list of columns for min 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--max", "-max", help="Comma separated list of columns for max 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--bit_xor", "-bit_xor", help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--std", "-std", help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--grouped-columns", "-gc", help="Comma separated list of columns to use in GroupBy 'col_a,col_b'", ) optional_arguments.add_argument( "--exclude-columns", "-ec", action="store_true", help="Flag to indicate the list of columns should be excluded from validation and not included.", ) optional_arguments.add_argument( "--threshold", "-th", type=threshold_float, default=0.0, help="Float max threshold for percent difference", ) optional_arguments.add_argument( "--filters", "-filters", type=get_filters, default=[], help="Filters in the format source_filter:target_filter", ) optional_arguments.add_argument( "--wildcard-include-string-len", "-wis", action="store_true", help="Include string fields for wildcard aggregations.", ) optional_arguments.add_argument( "--wildcard-include-timestamp", "-wit", action="store_true", help="Include timestamp/date fields for wildcard aggregations.", ) optional_arguments.add_argument( "--cast-to-bigint", "-ctb", action="store_true", help="Cast any int32 fields to int64 for large aggregations.", ) # Group required arguments required_arguments = column_parser.add_argument_group("required arguments") required_arguments.add_argument( "--tables-list", "-tbls", default=None, required=True, help="Comma separated tables list in the form 'schema.table=target_schema.target_table'. Or shorthand schema.* for all tables.", ) _add_common_arguments(optional_arguments, required_arguments) def _configure_schema_parser(schema_parser): """Configure arguments to run schema level validations.""" # Group optional arguments optional_arguments = schema_parser.add_argument_group("optional arguments") optional_arguments.add_argument( "--exclusion-columns", "-ec", help="Comma separated list of columns 'col_a,col_b' to be excluded from the schema validation", ) optional_arguments.add_argument( "--allow-list", "-al", help="Comma separated list of datatype mappings due to incompatible datatypes in source and target. e.g.: decimal(12,2):decimal(38,9),!string:string,decimal(10-18,0):int64", ) optional_arguments.add_argument( "--allow-list-file", "-alf", help="YAML file containing default --allow-list mappings. Can be used in conjunction with --allow-list. e.g.: samples/allow_list/oracle_to_bigquery.yaml or gs://dvt-allow-list-files/oracle_to_bigquery.yaml. See example files in samples/allow_list/", ) # Group required arguments required_arguments = schema_parser.add_argument_group("required arguments") required_arguments.add_argument( "--tables-list", "-tbls", default=None, required=True, help="Comma separated tables list in the form 'schema.table=target_schema.target_table'", ) _add_common_arguments(optional_arguments, required_arguments) def _configure_custom_query_parser(custom_query_parser): """Configure arguments to run custom-query validations.""" custom_query_subparsers = custom_query_parser.add_subparsers( dest="custom_query_type" ) # Add arguments for custom-query row parser custom_query_row_parser = custom_query_subparsers.add_parser( "row", help="Run a custom query row validation" ) _configure_custom_query_row_parser(custom_query_row_parser) # Add arguments for custom-query column parser custom_query_column_parser = custom_query_subparsers.add_parser( "column", help="Run a custom query column validation" ) _configure_custom_query_column_parser(custom_query_column_parser) def _configure_custom_query_row_parser(custom_query_row_parser): optional_arguments = custom_query_row_parser.add_argument_group( "optional arguments" ) required_arguments = custom_query_row_parser.add_argument_group( "required arguments" ) _configure_row_parser( custom_query_row_parser, optional_arguments, required_arguments, is_custom_query=True, ) # Group for mutually exclusive source query arguments. Either must be supplied source_mutually_exclusive = required_arguments.add_mutually_exclusive_group( required=True ) source_mutually_exclusive.add_argument( "--source-query-file", "-sqf", help="File containing the source sql query", ) source_mutually_exclusive.add_argument( "--source-query", "-sq", help="Source sql query", ) # Group for mutually exclusive target query arguments. Either must be supplied target_mutually_exclusive = required_arguments.add_mutually_exclusive_group( required=True ) target_mutually_exclusive.add_argument( "--target-query-file", "-tqf", help="File containing the target sql query", ) target_mutually_exclusive.add_argument( "--target-query", "-tq", help="Target sql query", ) def _configure_custom_query_column_parser(custom_query_column_parser): # Group optional arguments optional_arguments = custom_query_column_parser.add_argument_group( "optional arguments" ) optional_arguments.add_argument( "--count", "-count", help="Comma separated list of columns for count 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--sum", "-sum", help="Comma separated list of columns for sum 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--avg", "-avg", help="Comma separated list of columns for avg 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--min", "-min", help="Comma separated list of columns for min 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--max", "-max", help="Comma separated list of columns for max 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--bit_xor", "-bit_xor", help="Comma separated list of columns for hashing a concatenate 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--std", "-std", help="Comma separated list of columns for standard deviation 'col_a,col_b' or * for all columns", ) optional_arguments.add_argument( "--exclude-columns", "-ec", action="store_true", help="Flag to indicate the list of columns should be excluded from validation and not included.", ) optional_arguments.add_argument( "--wildcard-include-string-len", "-wis", action="store_true", help="Include string fields for wildcard aggregations.", ) optional_arguments.add_argument( "--wildcard-include-timestamp", "-wit", action="store_true", help="Include timestamp/date fields for wildcard aggregations.", ) optional_arguments.add_argument( "--cast-to-bigint", "-ctb", action="store_true", help="Cast any int32 fields to int64 for large aggregations.", ) optional_arguments.add_argument( "--filters", "-filters", type=get_filters, default=[], help="Filters in the format source_filter:target_filter", ) optional_arguments.add_argument( "--threshold", "-th", type=threshold_float, default=0.0, help="Float max threshold for percent difference", ) # Group required arguments required_arguments = custom_query_column_parser.add_argument_group( "required arguments" ) # Group for mutually exclusive source query arguments. Either must be supplied source_mutually_exclusive = required_arguments.add_mutually_exclusive_group( required=True ) source_mutually_exclusive.add_argument( "--source-query-file", "-sqf", help="File containing the source sql query", ) source_mutually_exclusive.add_argument( "--source-query", "-sq", help="Source sql query", ) # Group for mutually exclusive target query arguments. Either must be supplied target_mutually_exclusive = required_arguments.add_mutually_exclusive_group( required=True ) target_mutually_exclusive.add_argument( "--target-query-file", "-tqf", help="File containing the target sql query", ) target_mutually_exclusive.add_argument( "--target-query", "-tq", help="Target sql query", ) _add_common_arguments(optional_arguments, required_arguments) def _add_common_arguments( optional_arguments, required_arguments, is_generate_partitions=False ): # Group all Required Arguments together required_arguments.add_argument( "--source-conn", "-sc", required=True, help="Source connection name" ) required_arguments.add_argument( "--target-conn", "-tc", required=True, help="Target connection name" ) # Optional arguments optional_arguments.add_argument( "--bq-result-handler", "-bqrh", help=argparse.SUPPRESS ) optional_arguments.add_argument( "--result-handler", "-rh", help=( "Result handler connection details. " "CONNECTION_NAME.SCHEMA.TABLE or BQ_PROJECT_ID.DATASET.TABLE." ), ) optional_arguments.add_argument( "--labels", "-l", help="Key value pair labels for validation run" ) optional_arguments.add_argument( "--service-account", "-sa", help="Path to SA key file for result handler output", ) if not is_generate_partitions: optional_arguments.add_argument( "--config-file", "-c", help="Store the validation config in the YAML File Path specified", ) optional_arguments.add_argument( "--config-file-json", "-cj", help="Store the validation config in the JSON File Path specified to be used for application use cases", ) optional_arguments.add_argument( "--format", "-fmt", default=consts.FORMAT_TYPE_TABLE, help="Set the format for printing command output, Supported formats are (text, csv, json, table). Defaults " "to table", ) optional_arguments.add_argument( "--filter-status", "-fs", # TODO: update if we start to support other statuses help="Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned", ) optional_arguments.add_argument( "--run-id", "-rid", default=None, help="Set a string for the run_id, if None is input then a randomly generated UUID will be used, which is the default behaviour", ) def _check_positive(value: int, lower_bound: int = 1) -> int: ivalue = int(value) if ivalue < lower_bound: if lower_bound == 1: raise argparse.ArgumentTypeError( f"{value} is an invalid positive int value" ) else: raise argparse.ArgumentTypeError(f"Value must be >= {lower_bound}: {value}") return ivalue def _check_gt_one(value: int) -> int: return _check_positive(value, lower_bound=2) def check_no_yaml_files(partition_num: int, parts_per_file: int): """Check that number of yaml files generated is less than 10,001 Will be invoked after all the arguments are processed.""" if math.ceil(partition_num / parts_per_file) < 10001: return else: raise argparse.ArgumentTypeError( f"partition-num={partition_num} results in more than the maximum number of yaml files (i.e. 10,000). Reduce the number of yaml files by using the --parts-per-file argument or decreasing the number of partitions." ) def get_connection_config_from_args(args): """Return dict with connection config supplied.""" config = { consts.SOURCE_TYPE: args.connect_type, consts.SECRET_MANAGER_TYPE: getattr(args, consts.SECRET_MANAGER_TYPE), consts.SECRET_MANAGER_PROJECT_ID: getattr( args, consts.SECRET_MANAGER_PROJECT_ID ), } if args.connect_type == "Raw": return json.loads(args.json) for field_obj in CONNECTION_SOURCE_FIELDS[args.connect_type]: field = field_obj[0] if getattr(args, field) is None: continue config[field] = getattr(args, field) return config def threshold_float(x): """Restrict threshold arg to be a positive float.""" try: x = float(x) except ValueError: raise argparse.ArgumentTypeError("%r not a floating-point literal" % (x,)) if x < 0.0 or x > sys.float_info.max: raise argparse.ArgumentTypeError( "%r must be positive and below the max float value" % (x,) ) elif x != x: raise argparse.ArgumentTypeError("%r must be a number" % (x,)) return x def _generate_random_name(conn): name = f"{conn[consts.SOURCE_TYPE]}_{str(uuid.uuid4())}" return name def store_connection(connection_name, conn): """Store the connection config under the given name.""" mgr = state_manager.StateManager() mgr.create_connection(connection_name, conn) def delete_connection(connection_name): """Delete the connection config under the given name.""" mgr = state_manager.StateManager() mgr.delete_connection(connection_name) def list_connections(): """List all saved connections.""" mgr = state_manager.StateManager() connections = mgr.list_connections() for conn_name in connections: source_type = mgr.get_connection_config(conn_name).get("source_type") print(f"Connection Name: {conn_name}") print(f"Source Type: {source_type}\n") return connections def describe_connection(connection_name, output_format): """Return yaml connection details for a specific connection""" mgr = state_manager.StateManager() connection_details = mgr.describe_connection(connection_name, output_format) print(connection_details) return connection_details def get_connection(connection_name): """Return dict connection details for a specific connection.""" mgr = state_manager.StateManager() return mgr.get_connection_config(connection_name) def store_validation(validation_file_name, config, include_log=True): """Store the validation config under the given name.""" validation_path = gcs_helper.get_validation_path(validation_file_name) if validation_file_name.endswith(".yaml"): config_str = dump(config, Dumper=Dumper) elif validation_file_name.endswith("json"): config_str = json.dumps(config) else: raise ValueError(f"Invalid validation file name: {validation_file_name}") gcs_helper.write_file(validation_path, config_str, include_log=include_log) def get_validation(name: str, config_dir: str = None): """Return validation YAML config.""" if config_dir: validation_path = os.path.join(config_dir, name) else: validation_path = gcs_helper.get_validation_path(name) validation_bytes = gcs_helper.read_file(validation_path) return load(validation_bytes, Loader=Loader) def list_validations(config_dir="./"): """List all saved validation YAMLs in a directory.""" logging.info(f"Looking for validations in path {config_dir}") if gcs_helper._is_gcs_path(config_dir): if not config_dir.endswith("/"): config_dir += "/" files = gcs_helper.list_gcs_directory(config_dir) else: files = os.listdir(config_dir) return [file_name for file_name in files if file_name.endswith(".yaml")] def print_validations_in_dir(config_dir="./"): validations = list_validations(config_dir=config_dir) logging.info("Validation YAMLs found:") for validation_name in validations: logging.info(validation_name) def get_labels(arg_labels): """Return list of tuples representing key-value label pairs.""" labels = [] if arg_labels: pairs = arg_labels.split(",") for pair in pairs: kv = pair.split("=") if len(kv) == 2: labels.append((kv[0], kv[1])) else: raise ValueError("Labels must be comma-separated key-value pairs.") return labels def get_filters(filter_value: str) -> List[Dict]: """Returns filters for source and target from --filters argument. A filter is the condition that is used in a SQL WHERE clause. If only one filter is specified, it applies to both source and target For a doc on regular expression for filters see docs/internal/filters_regex.md """ filters = util.split_not_in_quotes(filter_value, ":") if len(filters) not in (1, 2): raise argparse.ArgumentTypeError("Unable to parse filter arguments.") filters = [_.strip() for _ in filters] if len(filters) == 1: if not filters[0]: raise argparse.ArgumentTypeError("Empty string not allowed in filter") filter_dict = { "type": "custom", "source": filters[0], "target": filters[0], } elif len(filters) == 2: if not filters[0] or not filters[1]: raise argparse.ArgumentTypeError("Empty string not allowed in filter") filter_dict = { "type": "custom", "source": filters[0], "target": filters[1], } filter_config = [ filter_dict, ] return filter_config def _get_result_handler(rc_value: str, sa_file=None) -> dict: """Returns dict of result handler config. Backwards compatible for JSON input. rc_value (str): Result config argument specified. sa_file (str): SA path argument specified. """ config = rc_value.split(".", 1) if len(config) != 2: raise ValueError(f"Unable to parse result handler config: `{rc_value}`") # Check if the first part of the result handler is a connection name. mgr = state_manager.StateManager() connections = mgr.list_connections() if config[0] in connections: # We received connection_name.results_table. conn_from_file = get_connection(config[0]) if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY: result_handler = { consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE], consts.PROJECT_ID: conn_from_file["project_id"], consts.TABLE_ID: config[1], consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None), } elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES: result_handler = { consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE], consts.TABLE_ID: config[1], consts.RH_CONN: conn_from_file, } # TODO Add filesytem handler too. else: raise exceptions.ResultHandlerException( f"Unsupported result handler connection type: {conn_from_file[consts.SOURCE_TYPE]}" ) else: # We received legacy format "bq-project-name.bq_results_table". result_handler = { consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY, consts.PROJECT_ID: config[0], consts.TABLE_ID: config[1], } if sa_file: result_handler[consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH] = sa_file return result_handler def get_arg_list(arg_value, default_value=None): """Returns list of values from argument provided. Backwards compatible for JSON input. arg_value (str): Argument supplied default_value (Any): A default value to supply when arg_value is empty. """ if not arg_value: return default_value return _read_json_value(arg_value) or arg_value.split(",") def _read_json_value(arg_value: str) -> list: """Returns a deserialized JSON value or None if an error occurs.""" try: if isinstance(arg_value, list): arg_value = str(arg_value) return json.loads(arg_value) except json.decoder.JSONDecodeError: return None def get_tables_list(arg_tables, default_value=None, is_filesystem=False): """Returns dictionary of tables. Backwards compatible for JSON input. arg_table (str): tables_list argument specified default_value (Any): A default value to supply when arg_value is empty. is_filesystem (boolean): Boolean indicating whether source connection is a FileSystem. In this case, a schema is not required. """ if not arg_tables: return default_value json_tables_list = _read_json_value(arg_tables) if json_tables_list: return json_tables_list tables_list = [] tables_mapping = list(csv.reader([arg_tables]))[0] source_schema_required = bool(not is_filesystem) for mapping in tables_mapping: tables_map = mapping.split("=") if len(tables_map) == 1: schema, table = split_table( tables_map, schema_required=source_schema_required ) table_dict = { "schema_name": schema, "table_name": table, } elif len(tables_map) == 2: src_schema, src_table = split_table( [tables_map[0]], schema_required=source_schema_required ) table_dict = { "schema_name": src_schema, "table_name": src_table, } targ_schema, targ_table = split_table( [tables_map[1]], schema_required=False ) if targ_schema: table_dict["target_schema_name"] = targ_schema table_dict["target_table_name"] = targ_table else: raise ValueError( "Unable to parse tables list. Please provide valid mapping." ) tables_list.append(table_dict) return tables_list def split_table(table_ref, schema_required=True): """Returns schema and table name given list of input values. table_ref (List): Table reference i.e ['my.schema.my_table'] schema_required (boolean): Indicates whether schema is required. A source table reference requires schema. A target table reference does not. """ table_ref_list = list(csv.reader(table_ref, delimiter=".", quotechar='"'))[0] if len(table_ref_list) == 1 and schema_required: raise ValueError("Please provide schema in tables list.") elif len(table_ref_list) == 1: return None, table_ref_list[0].strip() table = table_ref_list.pop() schema = ".".join(table_ref_list) return schema.strip(), table.strip() def get_query_from_file(filename): """Return query from input file""" query = "" try: query = gcs_helper.read_file(filename, download_as_text=True) query = query.rstrip(";\n") except IOError: logging.error("Cannot read query file: ", filename) if not query or query.isspace(): raise ValueError( "Expected file with sql query, got empty file or file with white spaces. " f"input file: {filename}" ) return query def get_query_from_inline(inline_query): """Return query from inline query arg""" query = inline_query.strip() query = query.rstrip(";\n") if not query or query.isspace(): raise ValueError( "Expected arg with sql query, got empty arg or arg with white " f"spaces. input query: '{inline_query}'" ) return query def get_query_from_query_args(query_str_arg, query_file_arg) -> str: if query_str_arg: return get_query_from_inline(query_str_arg) else: return get_query_from_file(query_file_arg) def _max_concat_columns( max_concat_columns_option: int, source_client, target_client ) -> Optional[int]: """Determine any upper limit on number of columns allowed into concat() operation.""" if max_concat_columns_option: # User specified limit takes precedence. return max_concat_columns_option else: source_max = consts.MAX_CONCAT_COLUMNS_DEFAULTS.get(source_client.name, None) target_max = consts.MAX_CONCAT_COLUMNS_DEFAULTS.get(target_client.name, None) if source_max and target_max: return min(source_max, target_max) else: return source_max or target_max def _concat_column_count_configs( cols: list, pre_build_configs: dict, arg_to_override: str, max_col_count: int, ) -> list: """ Ensure we don't have too many columns for the engines involved. https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/1216 """ return_list = [] if max_col_count and len(cols) > max_col_count: for col_chunk in list_to_sublists(cols, max_col_count): col_csv = ",".join(col_chunk) pre_build_configs_copy = copy.copy(pre_build_configs) pre_build_configs_copy[arg_to_override] = col_csv return_list.append(pre_build_configs_copy) else: return_list.append(pre_build_configs) return return_list def get_pre_build_configs(args: "Namespace", validate_cmd: str) -> List[Dict]: """Return a dict of configurations to build ConfigManager object""" def cols_from_arg(concat_arg: str, client, table_obj: dict, query_str: str) -> list: if concat_arg == "*": # If validating with "*" then we need to expand to count the columns. if table_obj: return clients.get_ibis_table_schema( client, table_obj["schema_name"], table_obj["table_name"], ).names else: return clients.get_ibis_query_schema( client, query_str, ).names else: return get_arg_list(concat_arg) # validate_cmd will be set to 'row`, or 'Custom-query' if invoked by generate-table-partitions depending # on what is being partitioned. Otherwise validate_cmd will be set to None if validate_cmd is None: validate_cmd = args.validate_cmd.capitalize() if validate_cmd == "Schema": config_type = consts.SCHEMA_VALIDATION elif validate_cmd == "Column": config_type = consts.COLUMN_VALIDATION elif validate_cmd == "Row": config_type = consts.ROW_VALIDATION elif validate_cmd == "Custom-query": config_type = consts.CUSTOM_QUERY else: raise ValueError(f"Unknown Validation Type: {validate_cmd}") # Cater for legacy -bqrh. args.result_handler = args.result_handler or args.bq_result_handler # Get result handler config if args.result_handler: result_handler_config = _get_result_handler( args.result_handler, args.service_account ) else: result_handler_config = None # Set filter_config and threshold. Not supported in case of schema validation filter_config = getattr(args, consts.CONFIG_FILTERS, []) threshold = getattr(args, consts.CONFIG_THRESHOLD, 0.0) # Get labels if args.labels is None: labels = [] else: labels = get_labels(args.labels) # Get source and target clients mgr = state_manager.StateManager() source_client = clients.get_data_client(mgr.get_connection_config(args.source_conn)) target_client = clients.get_data_client(mgr.get_connection_config(args.target_conn)) # Get format: text, csv, json, table. Default is table format = args.format if args.format else consts.FORMAT_TYPE_TABLE # Get random row arguments. Only in row validations these attributes can be present. # Bad coding here, but keeping it so as not to introduce a breaking change. See # consts.py Line 17 for a more detailed explanation. use_random_rows = getattr(args, "use_random_row", False) random_row_batch_size = getattr(args, consts.CONFIG_RANDOM_ROW_BATCH_SIZE, None) # Get table list. Not supported in case of custom query validation is_filesystem = source_client._source_type == "FileSystem" query_str = None if config_type == consts.CUSTOM_QUERY: tables_list = get_tables_list( None, default_value=[{}], is_filesystem=is_filesystem ) query_str = get_query_from_query_args(args.source_query, args.source_query_file) else: tables_list = get_tables_list( args.tables_list, default_value=[{}], is_filesystem=is_filesystem ) # Get validation filter status: success, fail if args.filter_status is not None: arg_list = get_arg_list(args.filter_status) if all(arg in consts.VALIDATION_STATUSES for arg in arg_list): filter_status = arg_list else: raise ValueError("An unsupported status was provided") else: filter_status = None pre_build_configs_list = [] if config_type != consts.CUSTOM_QUERY: tables_list = find_tables.expand_tables_of_asterisk( tables_list, source_client, target_client ) for table_obj in tables_list: pre_build_configs = { "config_type": config_type, consts.CONFIG_SOURCE_CONN_NAME: args.source_conn, consts.CONFIG_TARGET_CONN_NAME: args.target_conn, "table_obj": table_obj, consts.CONFIG_LABELS: labels, consts.CONFIG_THRESHOLD: threshold, consts.CONFIG_FORMAT: format, consts.CONFIG_USE_RANDOM_ROWS: use_random_rows, consts.CONFIG_RANDOM_ROW_BATCH_SIZE: random_row_batch_size, "source_client": source_client, "target_client": target_client, "result_handler_config": result_handler_config, "filter_config": filter_config, consts.CONFIG_FILTER_STATUS: filter_status, consts.CONFIG_TRIM_STRING_PKS: getattr( args, consts.CONFIG_TRIM_STRING_PKS, False ), consts.CONFIG_CASE_INSENSITIVE_MATCH: getattr( args, consts.CONFIG_CASE_INSENSITIVE_MATCH, False ), consts.CONFIG_ROW_CONCAT: getattr(args, consts.CONFIG_ROW_CONCAT, None), consts.CONFIG_ROW_HASH: getattr(args, consts.CONFIG_ROW_HASH, None), consts.CONFIG_RUN_ID: getattr(args, consts.CONFIG_RUN_ID, None), "verbose": args.verbose, } if ( pre_build_configs[consts.CONFIG_ROW_CONCAT] or pre_build_configs[consts.CONFIG_ROW_HASH] ): # Ensure we don't have too many columns for the engines involved. cols = cols_from_arg( pre_build_configs[consts.CONFIG_ROW_HASH] or pre_build_configs[consts.CONFIG_ROW_CONCAT], source_client, table_obj, query_str, ) new_pre_build_configs = _concat_column_count_configs( cols, pre_build_configs, consts.CONFIG_ROW_HASH if args.hash else consts.CONFIG_ROW_CONCAT, _max_concat_columns( args.max_concat_columns, source_client, target_client ), ) if len(new_pre_build_configs) > 1: message_type = ( f'{table_obj["schema_name"]}.{table_obj["table_name"]}' if table_obj else "custom query" ) logging.info( f"Splitting validation into {len(new_pre_build_configs)} queries for {message_type}" ) pre_build_configs_list.extend(new_pre_build_configs) else: pre_build_configs_list.append(pre_build_configs) return pre_build_configs_list