def get_hive_tables()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils.py [0:0]


def get_hive_tables(config):
    """
    Get HIVE tables including format and partition info from hive_ddl_metadata table
    """
    dict = read_config_file(ast.literal_eval(config))
    if dict["rerun_flag"] == "N":
        query = constants.query_rerun_n.format(
            bq_dataset_audit=dict["bq_dataset_audit"],
            hive_ddl_metadata=dict["hive_ddl_metadata"],
            hive_db_name=dict["hive_db_name"],
        )
    else:
        query = constants.query_rerun_y.format(
            bq_dataset_audit=dict["bq_dataset_audit"],
            hive_ddl_metadata=dict["hive_ddl_metadata"],
            hive_db_name=dict["hive_db_name"],
            bq_load_audit=dict["bq_load_audit"],
        )
    print(query)
    client = bigquery.Client()
    query_job_df1 = client.query(query).to_dataframe()
    print(query_job_df1)
    if dict["dvt_check_flag"] == "Y":
        query = constants.query_dvt_y.format(
            bq_dataset_audit=dict["bq_dataset_audit"], dvt_results=dict["dvt_results"]
        )
    else:
        query = constants.query_dvt_n.format(
            bq_dataset_audit=dict["bq_dataset_audit"],
            schema_results_tbl=dict["schema_results_tbl"],
        )
    print(query)
    query_job_df2 = client.query(query).to_dataframe()
    query_job_df = query_job_df1.merge(query_job_df2, on="table")
    write_pd_to_gcs(
        query_job_df,
        dict["temp_bucket"],
        constants.df_hive_tables_path.format(hive_db_name=dict["hive_db_name"]),
    )