in classify-split-extract-workflow/classify-job/gcs_helper.py [0:0]
def get_list_of_uris(bucket_name: str, file_uri: str) -> List[str]:
"""Retrieves a list of URIs from a GCS bucket."""
logger.info(
f"Getting list of URIs for bucket=[{bucket_name}] and file=[{file_uri}]"
)
uri_list: List[str] = [] # Type annotation for uri_list
if not file_uri:
logger.warning("No file URI provided")
return uri_list
dirs, filename = split_uri_2_path_filename(file_uri)
if filename != START_PIPELINE_FILENAME:
# Single File processing
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_uri)
mime_type = blob.content_type
if (mime_type and mime_type in MIME_TYPES) or (
not mime_type and filename.lower().endswith(PDF_EXTENSION.lower())
):
logger.info(f"Handling single file {file_uri}")
uri_list.append(f"gs://{bucket_name}/{file_uri}")
else:
logger.info(f"Skipping {file_uri} - not supported mime type: {mime_type}")
else:
# Batch Processing
logger.info(
f"Starting pipeline to process documents inside"
f" bucket=[{bucket_name}] and sub-folder=[{dirs}]"
)
if dirs is None or dirs == "":
blob_list = storage_client.list_blobs(bucket_name)
else:
blob_list = storage_client.list_blobs(bucket_name, prefix=dirs + "/")
count = 0
for blob in blob_list:
if (
blob.name
and not blob.name.endswith("/")
and blob.name != START_PIPELINE_FILENAME
and not os.path.dirname(blob.name).endswith(SPLITTER_OUTPUT_DIR)
):
count += 1
f_uri = f"gs://{bucket_name}/{blob.name}"
logger.info(f"Handling {count}(th) document - {f_uri}")
mime_type = blob.content_type
if mime_type not in MIME_TYPES:
logger.info(
f"Skipping {f_uri} - not supported mime type: {mime_type}"
)
continue
uri_list.append(f_uri)
return uri_list