in src/translation/scripts/hive/extract_hive_ddls.py [0:0]
def get_hive_ddls(dict, run_id, spark):
"""
extract HIVE DDls and table metadata
"""
run_time = datetime.now()
dbCheck = spark.catalog._jcatalog.databaseExists(dict["hive_db"])
hive_db = dict["hive_db"]
bq_dataset = dict["bq_dataset"]
if dbCheck:
table_list = get_table_list(dict, spark)
for tbl in table_list:
print(f"Extracting DDL for Table {tbl}")
ddl_hive = ""
try:
ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl} as serde")
ddl_hive = (
ddl_hive_df.first()[0]
.split("\nLOCATION '")[0]
.split("\nSTORED AS")[0]
)
except Exception:
print(f"Could not get DDL for table: {tbl}, trying without SERDE now..")
if len(ddl_hive) < 1:
try:
ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl}")
ddl_hive = ddl_hive_df.first()[0].split("\nUSING ")[0]
except Exception as e:
print(e)
if len(ddl_hive) > 1:
ddl_hive = (
ddl_hive.replace(f"{hive_db}.", "").replace(
f" TABLE {tbl}", f" TABLE IF NOT EXISTS {hive_db}.{tbl}"
)
+ ";"
)
WriteToCloud(
ddl_hive, dict["bucket_name"], dict["gcs_ddl_output_path"], tbl
)
storage_format = get_table_format(tbl, hive_db, spark)
partition_flag, cluster_flag = get_partition_cluster_info(ddl_hive)
if storage_format == "CSV":
field_delimiter = get_tbl_delimiter(ddl_hive_df.first()[0])
else:
field_delimiter = "NA"
ddl_extracted = "YES"
else:
(
ddl_extracted,
partition_flag,
cluster_flag,
storage_format,
field_delimiter,
) = ("NO", "", "", "", "")
metadata_list = [
{
"run_id": run_id,
"start_time": str(run_time),
"database": hive_db,
"bq_dataset": bq_dataset,
"table": tbl,
"field_delimiter": field_delimiter,
"partition_flag": partition_flag,
"cluster_flag": cluster_flag,
"format": storage_format,
"ddl_extracted": ddl_extracted,
}
]
client = bigquery.Client()
client.insert_rows_json(
dict["bq_dataset_audit"] + "." + dict["bigquery_audit_table"],
metadata_list,
)
spark.stop()