def get_hive_ddls()

in src/translation/scripts/hive/extract_hive_ddls.py [0:0]


def get_hive_ddls(dict, run_id, spark):
    """
    extract HIVE DDls and table metadata
    """
    run_time = datetime.now()
    dbCheck = spark.catalog._jcatalog.databaseExists(dict["hive_db"])
    hive_db = dict["hive_db"]
    bq_dataset = dict["bq_dataset"]
    if dbCheck:
        table_list = get_table_list(dict, spark)
        for tbl in table_list:
            print(f"Extracting DDL for Table {tbl}")
            ddl_hive = ""
            try:
                ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl} as serde")
                ddl_hive = (
                    ddl_hive_df.first()[0]
                    .split("\nLOCATION '")[0]
                    .split("\nSTORED AS")[0]
                )
            except Exception:
                print(f"Could not get DDL for table: {tbl}, trying without SERDE now..")
            if len(ddl_hive) < 1:
                try:
                    ddl_hive_df = spark.sql(f"show create table  {hive_db}.{tbl}")
                    ddl_hive = ddl_hive_df.first()[0].split("\nUSING ")[0]
                except Exception as e:
                    print(e)
            if len(ddl_hive) > 1:
                ddl_hive = (
                    ddl_hive.replace(f"{hive_db}.", "").replace(
                        f" TABLE {tbl}", f" TABLE IF NOT EXISTS {hive_db}.{tbl}"
                    )
                    + ";"
                )
                WriteToCloud(
                    ddl_hive, dict["bucket_name"], dict["gcs_ddl_output_path"], tbl
                )
                storage_format = get_table_format(tbl, hive_db, spark)
                partition_flag, cluster_flag = get_partition_cluster_info(ddl_hive)
                if storage_format == "CSV":
                    field_delimiter = get_tbl_delimiter(ddl_hive_df.first()[0])
                else:
                    field_delimiter = "NA"
                ddl_extracted = "YES"
            else:
                (
                    ddl_extracted,
                    partition_flag,
                    cluster_flag,
                    storage_format,
                    field_delimiter,
                ) = ("NO", "", "", "", "")

            metadata_list = [
                {
                    "run_id": run_id,
                    "start_time": str(run_time),
                    "database": hive_db,
                    "bq_dataset": bq_dataset,
                    "table": tbl,
                    "field_delimiter": field_delimiter,
                    "partition_flag": partition_flag,
                    "cluster_flag": cluster_flag,
                    "format": storage_format,
                    "ddl_extracted": ddl_extracted,
                }
            ]

            client = bigquery.Client()
            client.insert_rows_json(
                dict["bq_dataset_audit"] + "." + dict["bigquery_audit_table"],
                metadata_list,
            )
        spark.stop()