in incubator-tools/docai_document_processing_pipeline/src/load_queue_cf/main.py [0:0]
def load_queue(request: Request) -> tuple[str, int]:
"""
HTTP Cloud Function that initializes and starts document processing.
Args:
request: HTTP request object containing JSON payload
with 'file_paths' list of GCS URIs
Returns:
Tuple of (response message, HTTP status code)
Note:
- Validates input format and content
- Handles both individual files and folder paths
- Initiates both sync and batch processing as needed
- Returns 400 for invalid requests, 500 for processing errors
"""
try:
# Ensure the request contains JSON
request_json = request.get_json(silent=True)
if request_json is None:
return "Invalid request, no JSON payload found", 400
# Access the file_paths field from the JSON payload
file_paths = request_json.get("file_paths")
# Check if file_paths is a list
if not isinstance(file_paths, list):
return "file_paths should be a list", 400
# Ensure the list is not empty
if not file_paths or len(file_paths) == 0:
return "file_paths list is empty", 400
files = []
for path in file_paths:
if path.endswith("/"):
bucket_name = path.split("/")[2]
folder_name = "/".join(path.split("/")[3:])
file_list = list_files_in_gcs_folder(bucket_name, folder_name)
files.extend(file_list)
else:
files.append(path)
# Add records to Firestore collection
populate_queue(files)
# Triggering the submit_batch cloud function manually for starting batch processing
trigger_batch_processing()
# Process all the sync files here
docs = get_sync_docs()
for doc in docs:
file_path = doc.get("file_path")
# Send online processing request
print(f"Processing {file_path} ...")
process_document_sync(file_path)
return (
"Queue populated successfully, Batch processing triggered and Sync processing is completed",
200,
)
except Exception as e:
print(f"Error processing the request: {e}")
return "Error processing the request", 500