def load_bq_tables()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]


def load_bq_tables(concat_db_tbl, config, op_load_dtm, op_run_id):
    """
    Load BigQuery table using BQ load command
    """
    start = datetime.now()
    print(f"Loading Table: {concat_db_tbl} Start Time: {str(start)}")
    dict = read_config_file(ast.literal_eval(config))
    dt = dict["dt"]
    df_hive_tbls = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_inc_table_list_metadata.format(dt=dt),
    )
    df_partition_clustering = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_partition_clustering_inc_tbl_path.format(dt=dt),
    )
    df_text_format_schema = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_text_format_schema_inc_tbl_path.format(dt=dt),
    )

    df_incremental_table_list = read_pd_from_gcs(
        dict["temp_bucket"],
        constants.df_inc_tables_list.format(dt=dt),
    )
    partition_flag = (
        df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl]["partition_flag"]
        .values[0]
        .upper()
    )
    file_format = (
        df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl]["format"]
        .values[0]
        .upper()
    )
    hive_db_name = df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl][
        "database"
    ].values[0]
    field_delimiter = df_hive_tbls[df_hive_tbls["concat_db_tbl"] == concat_db_tbl][
        "field_delimiter"
    ].values[0]
    table_gs_path_list = list(
        df_incremental_table_list[
            df_incremental_table_list["concat_db_tbl"] == concat_db_tbl
        ]["destination_path"].values
    )
    bq_dataset = concat_db_tbl.split(".")[0]
    tbl = concat_db_tbl.split(".")[1]
    print(f"Loading Table: {concat_db_tbl}")
    print(f"partition_flag : {partition_flag}")
    print(f"file_format : {file_format}")
    print(f"bq_dataset : {bq_dataset}")
    print(f"tbl : {tbl}")
    for table_gs_path in table_gs_path_list:
        print(f"Appending {tbl} from: {table_gs_path}".format(tbl, table_gs_path))
        hive_table_gs_path = table_gs_path.split(f"/{tbl}/")[0] + f"/{tbl}"
        if partition_flag == "Y":
            partition_column = df_partition_clustering[
                (df_partition_clustering["concat_db_tbl"] == concat_db_tbl)
            ]["partition_column"].values[0]
            clustering_column = df_partition_clustering[
                (df_partition_clustering["concat_db_tbl"] == concat_db_tbl)
            ]["clustering_column"].values[0]
            hive_partition_source_uri = (
                f" --hive_partitioning_source_uri_prefix={hive_table_gs_path}"
            ).format(table_gs_path)
            pccmd = partition_cluster_col_subcmd_1(
                partition_column, clustering_column, file_format
            )
        else:
            pccmd = ""
            hive_partition_source_uri = ""
            print(f"Table {concat_db_tbl} is not partitioned")
        (
            formatcmd,
            checkformat,
            droptable,
            text_tbl_schema_string,
        ) = file_format_subcmd_2(
            file_format,
            partition_flag,
            field_delimiter,
            concat_db_tbl,
            df_text_format_schema,
        )
        if checkformat == 1:
            # result = truncate_or_drop_tbl(tbl, droptable, dict)
            bqloadcmd = (
                f"bq load --source_format={file_format} {formatcmd} "
                f"{pccmd} --project_id={dict['project_id']} "
                f"{hive_partition_source_uri} {bq_dataset}.{tbl} "
                f"{table_gs_path} "
                f"{text_tbl_schema_string}"
            )
            print(bqloadcmd)
            result = subprocess.run(
                bqloadcmd, capture_output=True, shell=True, encoding="utf-8"
            )
            load_status, reason_for_failure, bq_job_id = get_job_status(tbl, result)
            save_load_status_bq(
                tbl,
                load_status,
                reason_for_failure,
                bq_job_id,
                bq_dataset,
                hive_db_name,
                dict,
                op_load_dtm,
                op_run_id,
            )
        else:
            print("Incorrect Source File Format for table: {}".format(tbl))
            save_load_status_bq(
                tbl,
                "FAIL",
                "Incorrect Source Format",
                "NA",
                bq_dataset,
                hive_db_name,
                dict,
                op_load_dtm,
                op_run_id,
            )
        end = datetime.now()
        print("\nTotal Time Taken to load the table : {}".format(end - start))