in incubator-tools/docai_document_processing_pipeline/src/process_batch_cf/main.py [0:0]
def process_document(file_path: str) -> str:
"""
Submits a document or folder for batch processing using Document AI.
Args:
file_path: GCS URI of the document or folder to process
Returns:
Batch operation ID for tracking
Note:
- Handles both single documents and folders
- Sets up callback for batch completion
- Configures output location and processing parameters
- Triggers new submission upon completion
"""
# Waiting for batch to complete asynchronously
def my_callback(future, file_path, batch_id, output_uri):
# Once batch completes, it calls this callback
if future.exception():
print(f"Exception occured when processing batch: {batch_id}")
update_queue_status(
file_path,
"failed",
error=f"Error occured while processing batch operation: {batch_id}",
)
copy_failed_file_to_folder(
file_path, GCS_FAILED_FILES_BUCKET, GCS_FAILED_FILES_PREFIX
)
if future.done():
print(f"Document {file_path} processed successfully")
update_queue_status(file_path, "completed", output_uri=output_uri)
trigger_new_submission()
# You must set the `api_endpoint` if you use a location other than "us".
opts = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")
# Call Document AI API to start batch process
client = documentai.DocumentProcessorServiceClient(client_options=opts)
name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)
if not file_path.endswith("/"):
# Specify specific GCS URIs to process individual documents
gcs_document = documentai.GcsDocument(
gcs_uri=file_path, mime_type=INPUT_MIME_TYPE
)
# Load GCS Input URI into a List of document files
gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
else:
# Specify a GCS URI Prefix to process an entire directory
gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=file_path)
input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
if GCS_OUTPUT_BUCKET:
# Configuring the batch process request
output_uri = f"gs://{GCS_OUTPUT_BUCKET.replace('gs://', '').rstrip('/')}/{GCS_OUTPUT_PREFIX.rstrip('/')}/"
gcs_output_config = documentai.DocumentOutputConfig(
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=output_uri
)
)
request = documentai.BatchProcessRequest(
name=name,
input_documents=input_config,
document_output_config=gcs_output_config,
)
operation = client.batch_process_documents(request=request)
batch_id = operation.operation.name
operation.add_done_callback(
lambda future: my_callback(
future, file_path, batch_id, f"{output_uri}{batch_id.split('/')[-1]}/"
)
)
return batch_id # Return batch process ID
return ""