def get_dvt_cmd_sql_validation()

in src/translation/dvt/main.py [0:0]


def get_dvt_cmd_sql_validation(config, sql_file, validation_params_from_gcs):
    cmd_errors = ""
    print(f"Running validation for sql file: {sql_file}")

    source_conn, source_conn_string = connection_string(
        config["validation_config"]["source_config"].items()
    )
    target_conn, target_conn_string = connection_string(
        config["validation_config"]["target_config"].items()
    )

    add_conn = create_connection.format(
        source_conn_string=source_conn_string, target_conn_string=target_conn_string
    )

    translated_config = config["migrationTask"]["translationConfigDetails"]
    source_gcs = translated_config["gcsSourcePath"]
    target_gcs = translated_config["gcsTargetPath"]
    copy_sql_command = copy_sql_files.format(
        source_gcs=source_gcs, target_gcs=target_gcs, sql_file=sql_file
    )
    custom_query_validation_type = config["validation_config"]["validation_type"]

    if custom_query_validation_type == "row":
        custom_validation_command = custom_query_row_validation.format(
            source_conn=source_conn,
            target_conn=target_conn,
            sql_file=sql_file,
            project_id=PROJECT_ID,
            unique_id=config["unique_id"],
        )

    elif custom_query_validation_type == "column":
        custom_validation_command = custom_query_column_validation.format(
            source_conn=source_conn,
            target_conn=target_conn,
            sql_file=sql_file,
            project_id=PROJECT_ID,
            unique_id=config["unique_id"],
        )

    else:
        print(
            f"Unknown validation type: {custom_query_validation_type} passed with config['type'] == 'sql'!"
        )
        cmd_errors += f"Unknown validation type: {custom_query_validation_type} passed with config['type'] == 'sql'!"
        return add_conn, cmd_errors

    additional_validation_flags = get_additional_validation_flags(
        ValidationEntity.File,
        sql_file,
        validation_type_flags_mapping[custom_query_validation_type],
        validation_params_from_gcs,
    )
    custom_validation_command += additional_validation_flags
    print(f"DVT command to be executed: {custom_validation_command}")
    dvt_bash = " ) && ( ".join(
        ["( " + add_conn, copy_sql_command, custom_validation_command + " )"]
    )
    return dvt_bash, cmd_errors