def _get_table_or_file_list()

in src/translation/dags/validation_crun_dag.py [0:0]


def _get_table_or_file_list(input_json):
    file_or_table_list = []
    input_json = ast.literal_eval(str(input_json))
    config = ast.literal_eval(str(input_json["config"]))
    translation_type = config["type"]
    validation_type = config["validation_config"]["validation_type"]
    validation_only = config["validation_only"]
    validation_params_file_path = config["validation_config"][
        "validation_params_file_path"
    ]
    bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
        validation_params_file_path
    )
    validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
        bucket_name, blob_name, translation_type, validation_type
    )
    common_request_json = {}
    common_request_json["config"] = config
    common_request_json["validation_params_from_gcs"] = validation_params_from_gcs
    if translation_type in ["ddl", "data", "dml"]:
        table_list = []
        if validation_only == "yes":
            for key in validation_params_from_gcs:
                source_table = validation_params_from_gcs[key]["source-table"]
                target_table = validation_params_from_gcs[key]["target-table"]
                table_list.append(source_table + "=" + target_table)
        else:
            table_list = input_json["table_list"]

        for table in table_list:
            request_json = common_request_json.copy()
            request_json["table"] = table
            file_or_table_list.append(request_json)
    elif translation_type == "sql":
        files = []
        if validation_only == "yes":
            for key in validation_params_from_gcs:
                files.append(key)
        else:
            files = input_json["files"]
        for file in files:
            request_json = common_request_json.copy()
            request_json["sql_file"] = file
            file_or_table_list.append(request_json)
    else:
        print(f"Unknown validation type: {translation_type}")
        raise AirflowFailException(f"Unknown translation type: {translation_type}")
    return file_or_table_list