in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]
def load_bq_tables(concat_db_tbl, config, op_load_dtm, op_run_id):
"""
Load BigQuery table using BQ load command
"""
start = datetime.now()
print(f"Loading Table: {concat_db_tbl} Start Time: {str(start)}")
dict = read_config_file(ast.literal_eval(config))
dt = dict["dt"]
df_hive_tbls = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_inc_table_list_metadata.format(dt=dt),
)
df_partition_clustering = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_partition_clustering_inc_tbl_path.format(dt=dt),
)
df_text_format_schema = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_text_format_schema_inc_tbl_path.format(dt=dt),
)
df_incremental_table_list = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_inc_tables_list.format(dt=dt),
)
partition_flag = (
df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl]["partition_flag"]
.values[0]
.upper()
)
file_format = (
df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl]["format"]
.values[0]
.upper()
)
hive_db_name = df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl][
"database"
].values[0]
field_delimiter = df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl][
"field_delimiter"
].values[0]
table_gs_path_list = list(
df_incremental_table_list[
df_incremental_table_list["concat_db_tbl"] == concat_db_tbl
]["destination_path"].values
)
bq_dataset = concat_db_tbl.split(".")[0]
tbl = concat_db_tbl.split(".")[1]
print(f"Loading Table: {concat_db_tbl}")
print(f"partition_flag : {partition_flag}")
print(f"file_format : {file_format}")
print(f"bq_dataset : {bq_dataset}")
print(f"tbl : {tbl}")
for table_gs_path in table_gs_path_list:
print(f"Appending {tbl} from: {table_gs_path}".format(tbl, table_gs_path))
hive_table_gs_path = table_gs_path.split(f"/{tbl}/")[0] + f"/{tbl}"
if partition_flag == "Y":
partition_column = df_partition_clustering[
(df_partition_clustering["concat_db_tbl"] == concat_db_tbl)
]["partition_column"].values[0]
clustering_column = df_partition_clustering[
(df_partition_clustering["concat_db_tbl"] == concat_db_tbl)
]["clustering_column"].values[0]
hive_partition_source_uri = (
f" --hive_partitioning_source_uri_prefix={hive_table_gs_path}"
).format(table_gs_path)
pccmd = partition_cluster_col_subcmd_1(
partition_column, clustering_column, file_format
)
else:
pccmd = ""
hive_partition_source_uri = ""
print(f"Table {concat_db_tbl} is not partitioned")
(
formatcmd,
checkformat,
droptable,
text_tbl_schema_string,
) = file_format_subcmd_2(
file_format,
partition_flag,
field_delimiter,
concat_db_tbl,
df_text_format_schema,
)
if checkformat == 1:
# result = truncate_or_drop_tbl(tbl, droptable, dict)
bqloadcmd = (
f"bq load --source_format={file_format} {formatcmd} "
f"{pccmd} --project_id={dict['project_id']} "
f"{hive_partition_source_uri} {bq_dataset}.{tbl} "
f"{table_gs_path} "
f"{text_tbl_schema_string}"
)
print(bqloadcmd)
result = subprocess.run(
bqloadcmd, capture_output=True, shell=True, encoding="utf-8"
)
load_status, reason_for_failure, bq_job_id = get_job_status(tbl, result)
save_load_status_bq(
tbl,
load_status,
reason_for_failure,
bq_job_id,
bq_dataset,
hive_db_name,
dict,
op_load_dtm,
op_run_id,
)
else:
print("Incorrect Source File Format for table: {}".format(tbl))
save_load_status_bq(
tbl,
"FAIL",
"Incorrect Source Format",
"NA",
bq_dataset,
hive_db_name,
dict,
op_load_dtm,
op_run_id,
)
end = datetime.now()
print("\nTotal Time Taken to load the table : {}".format(end - start))