in incubator-tools/docai_document_processing_pipeline/src/load_queue_cf/main.py [0:0]
def process_document_sync(gcs_input_uri: str) -> bool:
"""
Processes a document synchronously using Document AI.
Args:
gcs_input_uri: GCS URI of the document to process
Returns:
True if processing successful, False if failed
Note:
- Updates document status in Firestore throughout processing
- Handles quota exhaustion by switching to batch mode
- Moves failed documents to failure bucket
- Saves processed output as JSON to designated bucket
"""
try:
# Updating the queue status to log process_start_time
update_queue_status(gcs_input_uri, "processing")
# You must set the `api_endpoint` if you use a location other than "us".
opts = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
# Full resource name for the processor
name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)
# Specify the input document's GCS URI
gcs_document = documentai.types.GcsDocument(
gcs_uri=gcs_input_uri, mime_type=INPUT_MIME_TYPE
)
# Create a document processing request
request = documentai.types.ProcessRequest(name=name, gcs_document=gcs_document)
response = client.process_document(request=request)
# Convert response to JSON string
document_dict = MessageToDict(response.document._pb)
response_json = json.dumps(document_dict)
# Extracting file name from GCS uri
file_name = ".".join(gcs_input_uri.split("/")[-1].split(".")[:-1])
# Save output as JSON to GCS
output_uri = save_json_to_gcs(
response_json,
GCS_OUTPUT_BUCKET,
f"{file_name}.json",
prefix_folder=GCS_OUTPUT_PREFIX,
)
# Updating queue status as completed
update_queue_status(gcs_input_uri, "completed", output_uri=output_uri)
print(f"Processed document synchronously. Output saved to {output_uri}")
return True # Successfully processed the document synchronously
except ResourceExhausted:
print("Quota limit hit for synchronous processing")
# Moving to batch mode if Quota limit hit
update_sync_to_batch(gcs_input_uri)
trigger_batch_processing()
return True
except GoogleAPIError as e:
print(f"Error during synchronous processing: {e}")
# Other errors, mark as error
update_queue_status(gcs_input_uri, "failed", error=e)
copy_failed_file_to_folder(
gcs_input_uri, GCS_FAILED_FILES_BUCKET, GCS_FAILED_FILES_PREFIX
)
return False