in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]
def copy_inc_files(tbl_data, config_data, job_run_time):
"""
Copy incremental data to temporary bucket
"""
print("Table Data: " + tbl_data)
load_start_time = datetime.now()
dict = read_config_file(ast.literal_eval(config_data))
tbl_data_lst = tbl_data.split(",")
hive_db = tbl_data_lst[0].split(".")[0]
bq_table = tbl_data_lst[1]
source_gcs_path = tbl_data_lst[2]
hive_gcs_staging_path = dict["hive_gcs_staging_path"]
source_bucket_name = dict["source_bucket_name"]
gcs_inc_staging_bucket = dict["gcs_inc_staging_bucket"]
get_bq_dataset_query = constants.query_get_bq_dataset.format(
bq_dataset_audit=dict["bq_dataset_audit"],
hive_ddl_metadata=dict["hive_ddl_metadata"],
table=tbl_data_lst[1].split(".")[1],
hive_db=hive_db,
)
print(get_bq_dataset_query)
client = bigquery.Client()
bq_dataset_df = client.query(get_bq_dataset_query).to_dataframe()
if len(bq_dataset_df) > 0:
bq_dataset = bq_dataset_df.values[0][0]
print(
f"BQ Dataset corresponding to table {bq_table} and hive db {hive_db} is {bq_dataset}"
)
# bq_dataset = dict["hive_bq_dataset_mapping"][hive_db.split(".")[0]]
if len(hive_gcs_staging_path) > 0:
destination_gcs_path = (
str(date.today())
+ "/"
+ source_gcs_path.replace(hive_gcs_staging_path + "/", "")
)
else:
destination_gcs_path = str(date.today()) + "/" + source_gcs_path
try:
copy_blob(
source_bucket_name,
source_gcs_path,
gcs_inc_staging_bucket,
destination_gcs_path,
)
file_copy_status = "PASS"
except Exception as e:
print(f"Failed to copy file: gs://{source_bucket_name}/{source_gcs_path}")
print(e)
file_copy_status = "FAIL"
load_end_time = datetime.now()
save_file_copy_status(
dict["unique_id"],
job_run_time,
load_start_time,
load_end_time,
hive_db,
bq_dataset,
bq_table,
source_bucket_name,
source_gcs_path,
gcs_inc_staging_bucket,
destination_gcs_path,
file_copy_status,
dict["bq_dataset_audit"],
dict["hive_inc_load_tbl"],
)
else:
print(
f"Could not retrieve BQ dataset id for table {bq_table} and hive db {hive_db} from hive ddl audit table "
)
save_file_copy_status(
dict["unique_id"],
job_run_time,
load_start_time,
load_end_time,
hive_db,
"NA",
bq_table,
source_bucket_name,
source_gcs_path,
gcs_inc_staging_bucket,
"NA",
"FAIL",
dict["bq_dataset_audit"],
dict["hive_inc_load_tbl"],
)