in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]
def get_text_format_schema(config):
"""
Get schema of tables with TEXT file format
"""
client = bigquery.Client()
dict = read_config_file(ast.literal_eval(config))
dt = dict["dt"]
df = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_inc_table_list_metadata.format(dt=dt),
)
database_list_array = df["bq_dataset"].unique()
database_list = sorted(database_list_array)
print(database_list)
df_list = []
for dbname in database_list:
hive_tables = (
"'"
+ "','".join(df.loc[df["bq_dataset"] == dbname]["table"].values.tolist())
+ "'"
)
query = constants.query_inc_tbl_text_format_schema_info.format(
bq_dataset_name=dbname, table_names=hive_tables
)
print(query)
text_table_sub_df = client.query(query).to_dataframe()
df_list.append(text_table_sub_df)
if len(df_list) > 0:
text_table_df = pd.concat(df_list)
write_pd_to_gcs(
text_table_df,
dict["temp_bucket"],
constants.df_text_format_schema_inc_tbl_path.format(dt=dt),
)
print(text_table_df)
else:
print(
"No database / partition clustering table list found. So nothing to concatenate"
)