def get_hive_ddls()

in src/translation/scripts/hive/extract_hive_ddls_manual.py [0:0]


def get_hive_ddls(dict, run_id, spark):
    """
    extract HIVE DDls and table metadata
    """
    metadata = []
    run_time = datetime.now()
    dbCheck = spark.catalog._jcatalog.databaseExists(dict["hive_db"])
    hive_db = dict["hive_db"]
    bq_dataset = dict["bq_dataset"]
    if dbCheck:
        table_list = get_table_list(dict, spark)
        ddl_path, metadata_path = get_paths(dict, run_time)
        for tbl in table_list:
            print(f"Extracting DDL for Table {tbl}")
            ddl_hive = ""
            try:
                ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl} as serde")
                ddl_hive = (
                    ddl_hive_df.first()[0]
                    .split("\nLOCATION '")[0]
                    .split("\nSTORED AS")[0]
                )
            except Exception:
                print(f"Could not get DDL for table: {tbl}, trying without SERDE now..")
            if len(ddl_hive) < 1:
                try:
                    ddl_hive_df = spark.sql(f"show create table  {hive_db}.{tbl}")
                    ddl_hive = ddl_hive_df.first()[0].split("\nUSING ")[0]
                except Exception as e:
                    print(e)
            if len(ddl_hive) > 1:
                ddl_hive = (
                    ddl_hive.replace(f"{hive_db}.", "").replace(
                        f" TABLE {tbl}", f" TABLE IF NOT EXISTS {hive_db}.{tbl}"
                    )
                    + ";"
                )
                WriteToLocal(ddl_hive, ddl_path, tbl)
                storage_format = get_table_format(tbl, hive_db, spark)
                partition_flag, cluster_flag = get_partition_cluster_info(ddl_hive)
                if storage_format == "CSV":
                    field_delimiter = get_tbl_delimiter(ddl_hive_df.first()[0])
                else:
                    field_delimiter = "NA"
                ddl_extracted = "YES"
            else:
                (
                    ddl_extracted,
                    partition_flag,
                    cluster_flag,
                    storage_format,
                    field_delimiter,
                ) = ("NO", "", "", "", "")

            metadata.append(
                [
                    run_id,
                    run_time,
                    hive_db,
                    bq_dataset,
                    tbl,
                    field_delimiter,
                    partition_flag,
                    cluster_flag,
                    storage_format,
                    ddl_extracted,
                ]
            )

            df = pd.DataFrame(
                metadata,
                columns=[
                    "run_id",
                    "start_time",
                    "database",
                    "bq_dataset",
                    "table",
                    "field_delimiter",
                    "partition_flag",
                    "cluster_flag",
                    "format",
                    "ddl_extracted",
                ],
            )
        df["start_time"] = df["start_time"].astype(
            "string"
        )  # .astype("datetime64[ns]")
        df["run_id"] = df["run_id"].astype("string")
        df.to_csv(
            metadata_path + f"/{hive_db}.csv", index=False, sep=str("\t"), header=True
        )
        # df.to_parquet(metadata_path+f"/{hive_db}.parquet",index=False)
        print("Extracted DDL path: " + ddl_path)
        print("Metadata Path: " + metadata_path)
        spark.stop()