in src/translation/dvt/main.py [0:0]
def get_dvt_cmd_ddl_validation(config, table, validation_params_from_gcs):
cmd_errors = ""
print(f"Running validation for table: {table}")
source_conn, source_conn_string = connection_string(
config["validation_config"]["source_config"].items()
)
target_conn, target_conn_string = connection_string(
config["validation_config"]["target_config"].items()
)
add_conn = create_connection.format(
source_conn_string=source_conn_string, target_conn_string=target_conn_string
)
validation_type = config["validation_config"]["validation_type"]
if validation_type == "schema":
validation_command = schema_validation.format(
source_conn=source_conn,
target_conn=target_conn,
table=table,
bq_table=table.split("=")[-1],
project_id=PROJECT_ID,
unique_id=config["unique_id"],
)
elif validation_type == "column":
validation_command = column_validation.format(
source_conn=source_conn,
target_conn=target_conn,
table=table,
bq_table=table.split("=")[-1],
project_id=PROJECT_ID,
unique_id=config["unique_id"],
)
elif validation_type == "row":
validation_command = row_validation.format(
source_conn=source_conn,
target_conn=target_conn,
table=table,
bq_table=table.split("=")[-1],
project_id=PROJECT_ID,
unique_id=config["unique_id"],
)
additional_validation_flags = get_additional_validation_flags(
ValidationEntity.Table,
table,
validation_type_flags_mapping[validation_type],
validation_params_from_gcs,
)
validation_command += additional_validation_flags
print(f"DVT command to be executed: {validation_command}")
dvt_bash = " ) && ( ".join(["( " + add_conn, validation_command + " )"])
return dvt_bash, cmd_errors