def get_partition_clustering_info()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]


def get_partition_clustering_info(config):
    """
    Get Partitioning and Clustering info about the tables from information schema
    """
    client = bigquery.Client()
    dict = read_config_file(ast.literal_eval(config))
    dt = dict["dt"]
    df = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_inc_table_list_metadata.format(dt=dt),
    )
    database_list_array = df["bq_dataset"].unique()
    database_list = sorted(database_list_array)
    print(database_list)
    df_list = []
    for dbname in database_list:
        hive_tables = (
            "'"
            + "','".join(df.loc[df["bq_dataset"] == dbname]["table"].values.tolist())
            + "'"
        )
        query = constants.query_inc_tbl_partition_clustering_info.format(
            bq_dataset_name=dbname, table_names=hive_tables
        )
        print(query)
        partition_cluster_sub_df = client.query(query).to_dataframe()
        df_list.append(partition_cluster_sub_df)
    if len(df_list) > 0:
        partition_cluster_df = pd.concat(df_list)
        write_pd_to_gcs(
            partition_cluster_df,
            dict["temp_bucket"],
            constants.df_partition_clustering_inc_tbl_path.format(dt=dt),
        )
        print(partition_cluster_df)
    else:
        print(
            "No database / partition clustering table list found. So nothing to concatenate"
        )
    return df[["concat_db_tbl"]].values.tolist()