in src/translation/dags/validation_dag.py [0:0]
def parallelize_dvt_tasks(input_json):
bash_cmds_list = []
config = ast.literal_eval(str(input_json["config"]))
translation_type = config["type"]
validation_type = config["validation_config"]["validation_type"]
validation_only = config["validation_only"]
validation_params_file_path = config["validation_config"][
"validation_params_file_path"
]
bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
validation_params_file_path
)
validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
bucket_name, blob_name, translation_type, validation_type
)
if translation_type in ["ddl", "data", "dml"]:
table_list = []
if validation_only == "yes":
for key in validation_params_from_gcs:
source_table = validation_params_from_gcs[key]["source-table"]
target_table = validation_params_from_gcs[key]["target-table"]
table_list.append(source_table + "=" + target_table)
else:
table_list = input_json["table_list"]
for table in table_list:
bash_cmd = get_dvt_cmd_ddl_validation(
config, table, validation_params_from_gcs
)
bash_cmds_list.append(bash_cmd)
elif translation_type == "sql":
files = []
if validation_only == "yes":
for key in validation_params_from_gcs:
files.append(key)
else:
files = input_json["files"]
for file in files:
bash_cmd = get_dvt_cmd_sql_validation(
config, file, validation_params_from_gcs
)
bash_cmds_list.append(bash_cmd)
else:
print(f"Unknown translation type: {translation_type}")
return bash_cmds_list