def get_inc_table_list_for_copy()

in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]


def get_inc_table_list_for_copy(config, inc_gcs_files_list):
    """
    Get List of incremental tables using GCS path
    Expected path: hive_gcs_staging_path/databasename/tablename/*
    """
    dict = read_config_file(ast.literal_eval(config))
    hive_gcs_staging_path = dict["hive_gcs_staging_path"]
    client = bigquery.Client()
    tbl_list = []
    print("Incremental File list")
    print(inc_gcs_files_list)
    if len(hive_gcs_staging_path) > 0:
        for gcs_path in inc_gcs_files_list:
            gcs_str = (
                gcs_path.replace(hive_gcs_staging_path + "/", "")
                if len(hive_gcs_staging_path) > 0
                else gcs_path
            )
            if len(gcs_str.split("/")) > 2:
                dbname = gcs_str.split("/")[0].split(".")[0]
                tblname = gcs_str.split("/")[1]
                tbl_gcs_str = check_bq_table(client, gcs_path, dbname, tblname, dict)
                tbl_list.append(tbl_gcs_str) if len(tbl_gcs_str) > 0 else None
            else:
                print("Issue in parsing inc GCS string")
                return [[]]
    else:
        for gcs_str in inc_gcs_files_list:
            if len(gcs_str.split("/")) > 2:
                dbname = gcs_str.split("/")[0].split(".")[0]
                tblname = gcs_str.split("/")[1]
                tbl_gcs_str = check_bq_table(client, gcs_str, dbname, tblname, dict)
                tbl_list.append(tbl_gcs_str) if len(tbl_gcs_str) > 0 else None
    return [[tbl] for tbl in tbl_list]