in src/datamigration/dags/datamigration_utils/hive_bq_load_utils_inc.py [0:0]
def get_inc_table_list_for_copy(config, inc_gcs_files_list):
"""
Get List of incremental tables using GCS path
Expected path: hive_gcs_staging_path/databasename/tablename/*
"""
dict = read_config_file(ast.literal_eval(config))
hive_gcs_staging_path = dict["hive_gcs_staging_path"]
client = bigquery.Client()
tbl_list = []
print("Incremental File list")
print(inc_gcs_files_list)
if len(hive_gcs_staging_path) > 0:
for gcs_path in inc_gcs_files_list:
gcs_str = (
gcs_path.replace(hive_gcs_staging_path + "/", "")
if len(hive_gcs_staging_path) > 0
else gcs_path
)
if len(gcs_str.split("/")) > 2:
dbname = gcs_str.split("/")[0].split(".")[0]
tblname = gcs_str.split("/")[1]
tbl_gcs_str = check_bq_table(client, gcs_path, dbname, tblname, dict)
tbl_list.append(tbl_gcs_str) if len(tbl_gcs_str) > 0 else None
else:
print("Issue in parsing inc GCS string")
return [[]]
else:
for gcs_str in inc_gcs_files_list:
if len(gcs_str.split("/")) > 2:
dbname = gcs_str.split("/")[0].split(".")[0]
tblname = gcs_str.split("/")[1]
tbl_gcs_str = check_bq_table(client, gcs_str, dbname, tblname, dict)
tbl_list.append(tbl_gcs_str) if len(tbl_gcs_str) > 0 else None
return [[tbl] for tbl in tbl_list]