src/translation/scripts/hive/extract_hive_ddls.py [63:144]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    dict["host_ip"] = translation_config["hive_config"]["server_config"]["connection"][
        "host"
    ]
    source_path = translation_config["migrationTask"]["translationConfigDetails"][
        "gcsSourcePath"
    ]
    dict["bucket_name"] = source_path.split("/")[2]
    dict["gcs_ddl_output_path"] = source_path.split("/", 3)[-1]
    nm_map_list = translation_config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"]
    dict["hive_db"] = list(set(d["source"]["schema"] for d in nm_map_list))[0]
    dict["bq_dataset"] = list(set(d["target"]["schema"] for d in nm_map_list))[0]
    input_tables = translation_config["source_ddl_extract_table_list"]
    dict["input_tables_list"] = [x.lower() for x in input_tables.split(",")]
    return dict


def get_table_list(dict, spark):
    """
    Create list of tables to be loaded
    """
    table_list = []
    tables = spark.catalog.listTables(dict["hive_db"])
    for tbl in tables:
        table_list.append(tbl.name.lower())
    if dict["input_tables_list"][0] != "*":
        table_list = dict["input_tables_list"]
    else:
        table_list = table_list
    return table_list


def get_table_format(tbl, hive_db, spark):
    """
    Get table format
    """
    df = spark.sql(f"describe formatted {hive_db}.{tbl}")
    format_str = (
        df.filter("col_name == 'InputFormat'").select("data_type").first()[0].upper()
    )
    if "AVRO" in format_str:
        return "AVRO"
    elif "PARQUET" in format_str:
        return "PARQUET"
    elif "ORC" in format_str:
        return "ORC"
    elif "TEXT" in format_str:
        return "CSV"
    else:
        return "OTHER"


def get_partition_cluster_info(ddl_hive):
    """
    Get Partitioning and Clustering info
    """
    partitioning_flag = ""
    clustering_flag = ""

    if "PARTITIONED BY" in ddl_hive:
        partitioning_flag = "Y"
    else:
        partitioning_flag = "N"
    if "CLUSTERED BY" in ddl_hive:
        clustering_flag = "Y"
    else:
        clustering_flag = "N"

    return partitioning_flag, clustering_flag


def get_tbl_delimiter(hive_ddl_str):
    """
    Get Field Delimiter for TEXT tables
    Default Value:'\001' (default HIVE table delimiter)
    """
    if "field.delim' = " in hive_ddl_str:
        delim = repr(hive_ddl_str.split("field.delim' = ")[1].split("'")[1])
    else:
        delim = "\001"
    return delim
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/translation/scripts/hive/extract_hive_ddls_manual.py [66:147]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    dict["host_ip"] = translation_config["hive_config"]["server_config"]["connection"][
        "host"
    ]
    source_path = translation_config["migrationTask"]["translationConfigDetails"][
        "gcsSourcePath"
    ]
    dict["bucket_name"] = source_path.split("/")[2]
    dict["gcs_ddl_output_path"] = source_path.split("/", 3)[-1]
    nm_map_list = translation_config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"]
    dict["hive_db"] = list(set(d["source"]["schema"] for d in nm_map_list))[0]
    dict["bq_dataset"] = list(set(d["target"]["schema"] for d in nm_map_list))[0]
    input_tables = translation_config["source_ddl_extract_table_list"]
    dict["input_tables_list"] = [x.lower() for x in input_tables.split(",")]
    return dict


def get_table_list(dict, spark):
    """
    Create list of tables to be loaded
    """
    table_list = []
    tables = spark.catalog.listTables(dict["hive_db"])
    for tbl in tables:
        table_list.append(tbl.name.lower())
    if dict["input_tables_list"][0] != "*":
        table_list = dict["input_tables_list"]
    else:
        table_list = table_list
    return table_list


def get_table_format(tbl, hive_db, spark):
    """
    Get table format
    """
    df = spark.sql(f"describe formatted {hive_db}.{tbl}")
    format_str = (
        df.filter("col_name == 'InputFormat'").select("data_type").first()[0].upper()
    )
    if "AVRO" in format_str:
        return "AVRO"
    elif "PARQUET" in format_str:
        return "PARQUET"
    elif "ORC" in format_str:
        return "ORC"
    elif "TEXT" in format_str:
        return "CSV"
    else:
        return "OTHER"


def get_partition_cluster_info(ddl_hive):
    """
    Get Partitioning and Clustering info
    """
    partitioning_flag = ""
    clustering_flag = ""

    if "PARTITIONED BY" in ddl_hive:
        partitioning_flag = "Y"
    else:
        partitioning_flag = "N"
    if "CLUSTERED BY" in ddl_hive:
        clustering_flag = "Y"
    else:
        clustering_flag = "N"

    return partitioning_flag, clustering_flag


def get_tbl_delimiter(hive_ddl_str):
    """
    Get Field Delimiter for TEXT tables
    Default Value:'\001' (default HIVE table delimiter)
    """
    if "field.delim' = " in hive_ddl_str:
        delim = repr(hive_ddl_str.split("field.delim' = ")[1].split("'")[1])
    else:
        delim = "\001"
    return delim
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



