def copy_inc_files()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]


def copy_inc_files(tbl_data, config_data, job_run_time):
    """
    Copy incremental data to temporary bucket
    """
    print("Table Data: " + tbl_data)
    load_start_time = datetime.now()
    dict = read_config_file(ast.literal_eval(config_data))
    tbl_data_lst = tbl_data.split(",")
    hive_db = tbl_data_lst[0].split(".")[0]
    bq_table = tbl_data_lst[1]
    source_gcs_path = tbl_data_lst[2]
    hive_gcs_staging_path = dict["hive_gcs_staging_path"]
    source_bucket_name = dict["source_bucket_name"]
    gcs_inc_staging_bucket = dict["gcs_inc_staging_bucket"]
    get_bq_dataset_query = constants.query_get_bq_dataset.format(
        bq_dataset_audit=dict["bq_dataset_audit"],
        hive_ddl_metadata=dict["hive_ddl_metadata"],
        table=tbl_data_lst[1].split(".")[1],
        hive_db=hive_db,
    )
    print(get_bq_dataset_query)
    client = bigquery.Client()
    bq_dataset_df = client.query(get_bq_dataset_query).to_dataframe()
    if len(bq_dataset_df) > 0:
        bq_dataset = bq_dataset_df.values[0][0]
        print(
            f"BQ Dataset corresponding to table {bq_table} and hive db {hive_db} is {bq_dataset}"
        )
        # bq_dataset = dict["hive_bq_dataset_mapping"][hive_db.split(".")[0]]
        if len(hive_gcs_staging_path) > 0:
            destination_gcs_path = (
                str(date.today())
                + "/"
                + source_gcs_path.replace(hive_gcs_staging_path + "/", "")
            )
        else:
            destination_gcs_path = str(date.today()) + "/" + source_gcs_path
        try:
            copy_blob(
                source_bucket_name,
                source_gcs_path,
                gcs_inc_staging_bucket,
                destination_gcs_path,
            )
            file_copy_status = "PASS"
        except Exception as e:
            print(f"Failed to copy file: gs://{source_bucket_name}/{source_gcs_path}")
            print(e)
            file_copy_status = "FAIL"
        load_end_time = datetime.now()
        save_file_copy_status(
            dict["unique_id"],
            job_run_time,
            load_start_time,
            load_end_time,
            hive_db,
            bq_dataset,
            bq_table,
            source_bucket_name,
            source_gcs_path,
            gcs_inc_staging_bucket,
            destination_gcs_path,
            file_copy_status,
            dict["bq_dataset_audit"],
            dict["hive_inc_load_tbl"],
        )
    else:
        print(
            f"Could not retrieve BQ dataset id for  table {bq_table} and hive db {hive_db} from hive ddl audit table "
        )
        save_file_copy_status(
            dict["unique_id"],
            job_run_time,
            load_start_time,
            load_end_time,
            hive_db,
            "NA",
            bq_table,
            source_bucket_name,
            source_gcs_path,
            gcs_inc_staging_bucket,
            "NA",
            "FAIL",
            dict["bq_dataset_audit"],
            dict["hive_inc_load_tbl"],
        )