def process_document()

in incubator-tools/docai_document_processing_pipeline/src/process_batch_cf/main.py [0:0]


def process_document(file_path: str) -> str:
    """
    Submits a document or folder for batch processing using Document AI.

    Args:
        file_path: GCS URI of the document or folder to process

    Returns:
        Batch operation ID for tracking

    Note:
        - Handles both single documents and folders
        - Sets up callback for batch completion
        - Configures output location and processing parameters
        - Triggers new submission upon completion
    """

    # Waiting for batch to complete asynchronously
    def my_callback(future, file_path, batch_id, output_uri):
        # Once batch completes, it calls this callback
        if future.exception():
            print(f"Exception occured when processing batch: {batch_id}")
            update_queue_status(
                file_path,
                "failed",
                error=f"Error occured while processing batch operation: {batch_id}",
            )
            copy_failed_file_to_folder(
                file_path, GCS_FAILED_FILES_BUCKET, GCS_FAILED_FILES_PREFIX
            )

        if future.done():
            print(f"Document {file_path} processed successfully")
            update_queue_status(file_path, "completed", output_uri=output_uri)

        trigger_new_submission()

    # You must set the `api_endpoint` if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")

    # Call Document AI API to start batch process
    client = documentai.DocumentProcessorServiceClient(client_options=opts)
    name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)

    if not file_path.endswith("/"):
        # Specify specific GCS URIs to process individual documents
        gcs_document = documentai.GcsDocument(
            gcs_uri=file_path, mime_type=INPUT_MIME_TYPE
        )
        # Load GCS Input URI into a List of document files
        gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    else:
        # Specify a GCS URI Prefix to process an entire directory
        gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=file_path)
        input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)

    if GCS_OUTPUT_BUCKET:
        # Configuring the batch process request
        output_uri = f"gs://{GCS_OUTPUT_BUCKET.replace('gs://', '').rstrip('/')}/{GCS_OUTPUT_PREFIX.rstrip('/')}/"
        gcs_output_config = documentai.DocumentOutputConfig(
            gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
                gcs_uri=output_uri
            )
        )

        request = documentai.BatchProcessRequest(
            name=name,
            input_documents=input_config,
            document_output_config=gcs_output_config,
        )

        operation = client.batch_process_documents(request=request)

        batch_id = operation.operation.name

        operation.add_done_callback(
            lambda future: my_callback(
                future, file_path, batch_id, f"{output_uri}{batch_id.split('/')[-1]}/"
            )
        )

        return batch_id  # Return batch process ID
    return ""