in src/datamigration/dags/datamigration_utils/hive_bq_load_utils.py [0:0]
def load_bq_tables(tbl, config, op_load_dtm):
"""
Load BigQuery table using BQ load command
"""
start = datetime.now()
print(f"Loading Table: {tbl} Start Time: {str(start)}")
dict = read_config_file(ast.literal_eval(config))
op_run_id = dict["unique_id"]
df_hive_tbls = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_hive_tables_path.format(hive_db_name=dict["hive_db_name"]),
)
df_partition_clustering = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_partition_clustering_path.format(
hive_db_name=dict["hive_db_name"]
),
)
df_text_format_schema = read_pd_from_gcs(
dict["temp_bucket"],
constants.df_text_format_schema_path.format(hive_db_name=dict["hive_db_name"]),
)
partition_flag = (
df_hive_tbls[df_hive_tbls["table"] == tbl]["partition_flag"].values[0].upper()
)
file_format = df_hive_tbls[df_hive_tbls["table"] == tbl]["format"].values[0].upper()
field_delimiter = df_hive_tbls[df_hive_tbls["table"] == tbl][
"field_delimiter"
].values[0]
table_gs_path = constants.hive_tbl_gcs_path.format(
bkt_id=dict["hive_gcs_staging_bucket_id"],
path=dict["hive_gcs_staging_path"],
tbl=tbl,
)
# TODO filter out tables larger than 15 TB
table_size_status = get_table_size(tbl, table_gs_path)
if table_size_status:
if partition_flag == "Y":
partition_column = df_partition_clustering[
(df_partition_clustering["table_name"] == tbl)
]["partition_column"].values[0]
clustering_column = df_partition_clustering[
(df_partition_clustering["table_name"] == tbl)
]["clustering_column"].values[0]
hive_partition_source_uri = (
" --hive_partitioning_source_uri_prefix={table_gs_path}"
).format(table_gs_path=table_gs_path)
pccmd = partition_cluster_col_subcmd_1(
partition_column, clustering_column, file_format
)
else:
pccmd = ""
hive_partition_source_uri = ""
print(f"Table {tbl} is not partitioned")
(
formatcmd,
checkformat,
droptable,
text_tbl_schema_string,
) = file_format_subcmd_2(
file_format, partition_flag, field_delimiter, tbl, df_text_format_schema
)
if checkformat == 1:
result = truncate_or_drop_tbl(tbl, droptable, dict)
bqloadcmd = (
f"bq load --source_format={file_format} {formatcmd} "
f"{pccmd} --project_id={dict['project_id']} "
f"{hive_partition_source_uri} {dict['bq_dataset']}.{tbl} "
f"{table_gs_path}/* "
f"{text_tbl_schema_string}"
)
print(bqloadcmd)
result = subprocess.run(
bqloadcmd, capture_output=True, shell=True, encoding="utf-8"
)
load_status, reason_for_failure, bq_job_id = get_job_status(tbl, result)
save_load_status_bq(
tbl,
load_status,
reason_for_failure,
bq_job_id,
dict,
op_load_dtm,
op_run_id,
)
else:
print("Incorrect Source File Format for table: {}".format(tbl))
save_load_status_bq(
tbl,
"FAIL",
"Incorrect Source Format",
"NA",
dict,
op_load_dtm,
op_run_id,
)
end = datetime.now()
print("\nTotal Time Taken to load the table : {}".format(end - start))
else:
print(f"Skipping the table from load since it is more than 16TB: {tbl}")
save_load_status_bq(
tbl,
"FAIL",
"Cannot load table greater than 16TB",
"NA",
dict,
op_load_dtm,
op_run_id,
)