in src/translation/scripts/hive/extract_hive_ddls_manual.py [0:0]
def get_hive_ddls(dict, run_id, spark):
"""
extract HIVE DDls and table metadata
"""
metadata = []
run_time = datetime.now()
dbCheck = spark.catalog._jcatalog.databaseExists(dict["hive_db"])
hive_db = dict["hive_db"]
bq_dataset = dict["bq_dataset"]
if dbCheck:
table_list = get_table_list(dict, spark)
ddl_path, metadata_path = get_paths(dict, run_time)
for tbl in table_list:
print(f"Extracting DDL for Table {tbl}")
ddl_hive = ""
try:
ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl} as serde")
ddl_hive = (
ddl_hive_df.first()[0]
.split("\nLOCATION '")[0]
.split("\nSTORED AS")[0]
)
except Exception:
print(f"Could not get DDL for table: {tbl}, trying without SERDE now..")
if len(ddl_hive) < 1:
try:
ddl_hive_df = spark.sql(f"show create table {hive_db}.{tbl}")
ddl_hive = ddl_hive_df.first()[0].split("\nUSING ")[0]
except Exception as e:
print(e)
if len(ddl_hive) > 1:
ddl_hive = (
ddl_hive.replace(f"{hive_db}.", "").replace(
f" TABLE {tbl}", f" TABLE IF NOT EXISTS {hive_db}.{tbl}"
)
+ ";"
)
WriteToLocal(ddl_hive, ddl_path, tbl)
storage_format = get_table_format(tbl, hive_db, spark)
partition_flag, cluster_flag = get_partition_cluster_info(ddl_hive)
if storage_format == "CSV":
field_delimiter = get_tbl_delimiter(ddl_hive_df.first()[0])
else:
field_delimiter = "NA"
ddl_extracted = "YES"
else:
(
ddl_extracted,
partition_flag,
cluster_flag,
storage_format,
field_delimiter,
) = ("NO", "", "", "", "")
metadata.append(
[
run_id,
run_time,
hive_db,
bq_dataset,
tbl,
field_delimiter,
partition_flag,
cluster_flag,
storage_format,
ddl_extracted,
]
)
df = pd.DataFrame(
metadata,
columns=[
"run_id",
"start_time",
"database",
"bq_dataset",
"table",
"field_delimiter",
"partition_flag",
"cluster_flag",
"format",
"ddl_extracted",
],
)
df["start_time"] = df["start_time"].astype(
"string"
) # .astype("datetime64[ns]")
df["run_id"] = df["run_id"].astype("string")
df.to_csv(
metadata_path + f"/{hive_db}.csv", index=False, sep=str("\t"), header=True
)
# df.to_parquet(metadata_path+f"/{hive_db}.parquet",index=False)
print("Extracted DDL path: " + ddl_path)
print("Metadata Path: " + metadata_path)
spark.stop()