in src/datamigration/dags/datamigration_utils/hive_bq_load_utils.py [0:0]
def get_hive_tables(config):
"""
Get HIVE tables including format and partition info from hive_ddl_metadata table
"""
dict = read_config_file(ast.literal_eval(config))
if dict["rerun_flag"] == "N":
query = constants.query_rerun_n.format(
bq_dataset_audit=dict["bq_dataset_audit"],
hive_ddl_metadata=dict["hive_ddl_metadata"],
hive_db_name=dict["hive_db_name"],
)
else:
query = constants.query_rerun_y.format(
bq_dataset_audit=dict["bq_dataset_audit"],
hive_ddl_metadata=dict["hive_ddl_metadata"],
hive_db_name=dict["hive_db_name"],
bq_load_audit=dict["bq_load_audit"],
)
print(query)
client = bigquery.Client()
query_job_df1 = client.query(query).to_dataframe()
print(query_job_df1)
if dict["dvt_check_flag"] == "Y":
query = constants.query_dvt_y.format(
bq_dataset_audit=dict["bq_dataset_audit"], dvt_results=dict["dvt_results"]
)
else:
query = constants.query_dvt_n.format(
bq_dataset_audit=dict["bq_dataset_audit"],
schema_results_tbl=dict["schema_results_tbl"],
)
print(query)
query_job_df2 = client.query(query).to_dataframe()
query_job_df = query_job_df1.merge(query_job_df2, on="table")
write_pd_to_gcs(
query_job_df,
dict["temp_bucket"],
constants.df_hive_tables_path.format(hive_db_name=dict["hive_db_name"]),
)