def get_text_format_schema()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]


def get_text_format_schema(config):
    """
    Get schema of tables with TEXT file format
    """
    client = bigquery.Client()
    dict = read_config_file(ast.literal_eval(config))
    dt = dict["dt"]
    df = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_inc_table_list_metadata.format(dt=dt),
    )
    database_list_array = df["bq_dataset"].unique()
    database_list = sorted(database_list_array)
    print(database_list)
    df_list = []
    for dbname in database_list:
        hive_tables = (
            "'"
            + "','".join(df.loc[df["bq_dataset"] == dbname]["table"].values.tolist())
            + "'"
        )
        query = constants.query_inc_tbl_text_format_schema_info.format(
            bq_dataset_name=dbname, table_names=hive_tables
        )
        print(query)
        text_table_sub_df = client.query(query).to_dataframe()
        df_list.append(text_table_sub_df)
    if len(df_list) > 0:
        text_table_df = pd.concat(df_list)
        write_pd_to_gcs(
            text_table_df,
            dict["temp_bucket"],
            constants.df_text_format_schema_inc_tbl_path.format(dt=dt),
        )
        print(text_table_df)
    else:
        print(
            "No database / partition clustering table list found. So nothing to concatenate"
        )