def process_document_sync()

in incubator-tools/docai_document_processing_pipeline/src/load_queue_cf/main.py [0:0]


def process_document_sync(gcs_input_uri: str) -> bool:
    """
    Processes a document synchronously using Document AI.

    Args:
        gcs_input_uri: GCS URI of the document to process

    Returns:
        True if processing successful, False if failed

    Note:
        - Updates document status in Firestore throughout processing
        - Handles quota exhaustion by switching to batch mode
        - Moves failed documents to failure bucket
        - Saves processed output as JSON to designated bucket
    """

    try:
        # Updating the queue status to log process_start_time
        update_queue_status(gcs_input_uri, "processing")

        # You must set the `api_endpoint` if you use a location other than "us".
        opts = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")

        client = documentai.DocumentProcessorServiceClient(client_options=opts)

        # Full resource name for the processor
        name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)

        # Specify the input document's GCS URI
        gcs_document = documentai.types.GcsDocument(
            gcs_uri=gcs_input_uri, mime_type=INPUT_MIME_TYPE
        )

        # Create a document processing request
        request = documentai.types.ProcessRequest(name=name, gcs_document=gcs_document)

        response = client.process_document(request=request)

        # Convert response to JSON string
        document_dict = MessageToDict(response.document._pb)
        response_json = json.dumps(document_dict)

        # Extracting file name from GCS uri
        file_name = ".".join(gcs_input_uri.split("/")[-1].split(".")[:-1])

        # Save output as JSON to GCS
        output_uri = save_json_to_gcs(
            response_json,
            GCS_OUTPUT_BUCKET,
            f"{file_name}.json",
            prefix_folder=GCS_OUTPUT_PREFIX,
        )

        # Updating queue status as completed
        update_queue_status(gcs_input_uri, "completed", output_uri=output_uri)

        print(f"Processed document synchronously. Output saved to {output_uri}")

        return True  # Successfully processed the document synchronously

    except ResourceExhausted:
        print("Quota limit hit for synchronous processing")
        # Moving to batch mode if Quota limit hit
        update_sync_to_batch(gcs_input_uri)
        trigger_batch_processing()
        return True

    except GoogleAPIError as e:
        print(f"Error during synchronous processing: {e}")
        # Other errors, mark as error
        update_queue_status(gcs_input_uri, "failed", error=e)
        copy_failed_file_to_folder(
            gcs_input_uri, GCS_FAILED_FILES_BUCKET, GCS_FAILED_FILES_PREFIX
        )
        return False