in src/translation/dvt/main.py [0:0]
def get_dvt_cmd_sql_validation(config, sql_file, validation_params_from_gcs):
cmd_errors = ""
print(f"Running validation for sql file: {sql_file}")
source_conn, source_conn_string = connection_string(
config["validation_config"]["source_config"].items()
)
target_conn, target_conn_string = connection_string(
config["validation_config"]["target_config"].items()
)
add_conn = create_connection.format(
source_conn_string=source_conn_string, target_conn_string=target_conn_string
)
translated_config = config["migrationTask"]["translationConfigDetails"]
source_gcs = translated_config["gcsSourcePath"]
target_gcs = translated_config["gcsTargetPath"]
copy_sql_command = copy_sql_files.format(
source_gcs=source_gcs, target_gcs=target_gcs, sql_file=sql_file
)
custom_query_validation_type = config["validation_config"]["validation_type"]
if custom_query_validation_type == "row":
custom_validation_command = custom_query_row_validation.format(
source_conn=source_conn,
target_conn=target_conn,
sql_file=sql_file,
project_id=PROJECT_ID,
unique_id=config["unique_id"],
)
elif custom_query_validation_type == "column":
custom_validation_command = custom_query_column_validation.format(
source_conn=source_conn,
target_conn=target_conn,
sql_file=sql_file,
project_id=PROJECT_ID,
unique_id=config["unique_id"],
)
else:
print(
f"Unknown validation type: {custom_query_validation_type} passed with config['type'] == 'sql'!"
)
cmd_errors += f"Unknown validation type: {custom_query_validation_type} passed with config['type'] == 'sql'!"
return add_conn, cmd_errors
additional_validation_flags = get_additional_validation_flags(
ValidationEntity.File,
sql_file,
validation_type_flags_mapping[custom_query_validation_type],
validation_params_from_gcs,
)
custom_validation_command += additional_validation_flags
print(f"DVT command to be executed: {custom_validation_command}")
dvt_bash = " ) && ( ".join(
["( " + add_conn, copy_sql_command, custom_validation_command + " )"]
)
return dvt_bash, cmd_errors