# Document AI Processor Uptraining using Python

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/document-ai-samples/blob/main/uptraining_docai_processor_using_python/docai-uptraining.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>

</table>


# Overview

[Document AI](https://cloud.google.com/document-ai/docs) is a document understanding solution that takes unstructured data (e.g. documents, emails, invoices, forms, etc.) and makes the data easier to understand, analyze, and consume. The API provides structure through content classification, entity extraction, advanced searching, and more. With [Uptraining](https://cloud.google.com/document-ai/docs/workbench/uptrain-processor), you can achieve higher document processing accuracy by providing additional labeled examples for Specialized Document Types and creating a new model version.

In this notebook, you will create an Invoice Parser processor, configure the processor for uptraining, label example documents(optional), and uptrain the processor.

The document dataset used in this lab consists of randomly-generated invoices for a fictional piping company.

Note: This notebook is a python version of the exisiting [Qwiklab](https://www.cloudskillsboost.google/focuses/67858?parent=catalog).

<hr/>

## User Authentication

In [None]:
from google.colab import auth as google_auth

google_auth.authenticate_user()

## Install Dependencies

In [None]:
!pip install google-cloud-documentai google-cloud-storage -q

## Restart the runtime

In [None]:
%%capture

import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Import libraries

In [None]:
from google.cloud import documentai_v1beta3 as documentai
from google.longrunning.operations_pb2 import GetOperationRequest
from google.api_core.client_options import ClientOptions
import google.auth.transport.requests
from google import auth
from google.cloud import storage

import requests
import re
import time
from time import sleep
import json
from tqdm.auto import tqdm

## Initialize variables

In [None]:
project_id = "<YOUR PROJECT ID>"  # @param {type:"string"}
location = "us"  # @param {type:"string"}
processor_type = "INVOICE_PROCESSOR"  # @param {type:"string"}

# Processor display name
processor_display_name = "<DISPLAY NAME eg. invoice-test>"  # @param {type:"string"}

# GCS bucket path, to store the data
dataset_gcs_uri = "<GCS BUCKET URI eg. gs://invoice-api-test>"  # @param {type:"string"}

In [None]:
!gcloud auth application-default login

In [None]:
!gcloud config set project $project_id

In [None]:
!gcloud auth application-default set-quota-project $project_id

## Create Processor

In [None]:
def create_processor(project_id, location, processor_type, processor_display_name):
    """
    Function creates a Document AI processor,
    based on the provided processor type.
    """
    # Create a client
    client = documentai.DocumentProcessorServiceClient()

    parent = client.common_location_path(project_id, location)

    processor = documentai.Processor(
        type_=processor_type, display_name=processor_display_name
    )
    # Initialize request argument(s)
    request = documentai.CreateProcessorRequest(parent=parent, processor=processor)

    # Make the request
    response = client.create_processor(request=request)

    # Handle the response
    # print(response)

    return response

In [None]:
response = create_processor(
    project_id, location, processor_type, processor_display_name
)

In [None]:
# Get processor_resource_name
processor_name = response.name
processor_name

In [None]:
# Get default processor version, it'll be used as a base for uptraining.
base_version = response.default_processor_version
base_version

## Create Dataset

In order to train your processor, you will have to create a dataset with training and testing data to help the processor identify the entities you want to extract.

You will need to create a new bucket in Cloud Storage to store the dataset.


In [None]:
def create_dataset_bucket(project_id, dataset_gcs_uri):
    """
    Function to create a GCS bucket,
    if it does not exist.
    """
    client = storage.Client(project=project_id)
    bucket = client.bucket(dataset_gcs_uri.split("//")[1])
    if not bucket.exists():
        tqdm.write(f"Creating bucket {bucket.name}")
        client.create_bucket(bucket)
        tqdm.write(f"Bucket {bucket.name} created")
    else:
        tqdm.write(f"Bucket {bucket.name} already exists")

In [None]:
# create dataset_gcs_uri bucket if not exists
create_dataset_bucket(project_id, dataset_gcs_uri)

In [None]:
def poll_operation(operation_name, location):
    """
    Function to check status of long running operations.
    """
    # You must set the `api_endpoint` if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    request = GetOperationRequest(name=operation_name)

    while True:
        # Make GetOperation request
        operation = client.get_operation(request=request)

        # Stop polling when Operation is no longer running
        if operation.done:
            break

        tqdm.write(".", end="")
        # Wait 10 seconds before polling again
        sleep(10)

    tqdm.write("")
    return operation

In [None]:
def add_processor_dataset(processor_name, dataset_gcs_uri, project_id, location):
    """
    Function to add dataset information to a processor.
    """
    # Create a client
    client = documentai.DocumentServiceClient()

    # User managed dataset, for Document AI service manage dataset refer
    # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1beta3.types.Dataset
    gcs_managed_config = documentai.Dataset.GCSManagedConfig(
        gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=dataset_gcs_uri)
    )

    spanner_indexing_config = documentai.Dataset.SpannerIndexingConfig()

    # Initialize request argument(s)
    dataset = documentai.Dataset(
        name=client.dataset_path(project_id, location, processor_name.split("/")[-1]),
        gcs_managed_config=gcs_managed_config,
        spanner_indexing_config=spanner_indexing_config,
    )

    request = documentai.UpdateDatasetRequest(dataset=dataset)

    # Make the request
    operation = client.update_dataset(request=request)

    response = operation.result()

    # Handle the response
    # print(response)

    return response

In [None]:
response = add_processor_dataset(processor_name, dataset_gcs_uri, project_id, location)

## Import Documents

In [None]:
def import_documents(processor_name, location, gcs_uri_prefix, train_split):
    """
    Function to import documents to a processor,
    provided dataset uri and train_split ratio to split
    the data into train and test.
    """
    # Create a client
    client = documentai.DocumentServiceClient()

    # Initialize request argument(s)
    batch_input_config = documentai.BatchDocumentsInputConfig(
        gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=gcs_uri_prefix)
    )

    batch_documents_import_configs = (
        documentai.ImportDocumentsRequest.BatchDocumentsImportConfig(
            batch_input_config=batch_input_config
        )
    )

    if not isinstance(train_split, float):
        batch_documents_import_configs.dataset_split = train_split
    else:
        batch_documents_import_configs.auto_split_config.training_split_ratio = (
            train_split
        )

    dataset = client.dataset_path(project_id, location, processor_name.split("/")[-1])
    request = documentai.ImportDocumentsRequest(
        dataset=dataset,
        batch_documents_import_configs=[batch_documents_import_configs],
    )

    # Make the request
    operation = client.import_documents(request=request)

    print("Waiting for operation to complete...")
    operation = poll_operation(operation.operation.name, location)

    print("Documents are imported successfully")
    return operation

In [None]:
## Import a sample document
sample_doc_gcs_uri_prefix = (
    "gs://cloud-samples-data/documentai/codelabs/uptraining/pdfs"
)

In [None]:
operation = import_documents(
    processor_name,
    location,
    sample_doc_gcs_uri_prefix,
    train_split=documentai.DatasetSplitType.DATASET_SPLIT_UNASSIGNED,
)

#### [OPTIONAL] Label the test document

Follow the [instructions](https://www.cloudskillsboost.google/focuses/67858?parent=catalog#step9) to label the sample document in the Docuemnt AI console

### Import Pre-Labeled Data

Document AI Uptraining requires a minimum of 10 documents in both the training and test sets, along with 10 instances of each label in each set. It's recommended to have at least 50 documents in each set with 50 instances of each label for best performance. More training data generally equates to higher accuracy.

It will take a long time to manually label 100 documents, so we have some pre-labeled documents that you can import for this lab. You can import pre-labeled document files in the [Document.json](https://cloud.google.com/document-ai/docs/reference/rest/v1/Document) format. These can be results from calling a processor and verifying the accuracy using [Human in the Loop (HITL)](https://cloud.google.com/document-ai/hitl).

In [None]:
## Import all documents
docs_gcs_uri_prefix = "gs://cloud-samples-data/documentai/Custom/Invoices/JSON"

In [None]:
batch_operation = import_documents(
    processor_name, location, docs_gcs_uri_prefix, train_split=0.8
)

## Update schema with required fields

The sample documents we are using for this example do not contain every label supported by the Invoice Parser. We will need to mark the labels we are not using as inactive before training. You can also follow similar steps to add a custom label before Uptraining.

In [None]:
def get_dataset_schema(processor_name):
    """
    Function to get the existing processor schema.
    """
    # Create a client
    client = documentai.DocumentServiceClient()

    # Initialize request argument(s)
    request = documentai.GetDatasetSchemaRequest(
        name=client.dataset_schema_path(
            project_id, location, processor_name.split("/")[-1]
        ),
    )

    # Make the request
    response = client.get_dataset_schema(request=request)

    return response

In [None]:
schema = get_dataset_schema(processor_name)

In [None]:
## Fields which needs to be enable based on the imported dataset
enable_fields = [
    "invoice_date",
    "line_item/amount",
    "line_item/description",
    "line_item/quantity",
    "amount",
    "description",
    "receiver_address",
    "receiver_name",
    "supplier_address",
    "supplier_name",
    "total_amount",
]

In [None]:
def update_schema_fields(schema, enable_fields):
    """
    Function to update the schema with required fields.
    """
    for entity in schema.document_schema.entity_types:
        for prop in entity.properties:
            if prop.name not in enable_fields:
                prop.property_metadata = {"inactive": True}
    return schema

In [None]:
def update_dataset_schema(schema):
    """
    Function to update the dataset schema,
    with the updated schema fields.
    """
    # Create a client
    client = documentai.DocumentServiceClient()

    # Initialize request argument(s)
    dataset_schema = documentai.DatasetSchema(
        name=schema.name,
        document_schema=schema.document_schema,
    )

    request = documentai.UpdateDatasetSchemaRequest(dataset_schema=dataset_schema)

    # Make the request
    response = client.update_dataset_schema(request=request)

    # Handle the response
    # print(response)

    return response

In [None]:
schema = update_schema_fields(schema, enable_fields)
response = update_dataset_schema(schema)

## [OPTIONAL] Auto-label newly imported documents

When importing unlabeled documents for a processor with an existing deployed processor version, you can use [Auto-labeling](https://cloud.google.com/document-ai/docs/workbench/label-documents#auto-label) to save time on labeling.

## Processor Uptraining

In [None]:
def train_processor_version(processor_name, display_name, base_version):
    """
    Function to train the new processor version,
    provided base version, the new version will be built from.
    """
    # Create a client
    client = documentai.DocumentProcessorServiceClient()

    # Initialize request argument(s)
    processor_version = documentai.ProcessorVersion(
        display_name=display_name,
    )

    request = documentai.TrainProcessorVersionRequest(
        parent=processor_name,
        processor_version=processor_version,
        base_processor_version=base_version,
    )

    # Make the request
    operation = client.train_processor_version(request=request)

    print("Training job is triggered")
    return operation

In [None]:
display_name = "lab-uptraining-test-1"  # @param {type:"string"}

operation = train_processor_version(processor_name, display_name, base_version)
uptrained_version = operation.metadata.common_metadata.resource
uptrained_version

#### Note: The training job will take around an hour, so come back later to proceed further.

## Get Evaluation

In [None]:
def get_processor_version_info(processor_name):
    """
    Function to get the processor version info.
    """
    # Create a client
    client = documentai.DocumentProcessorServiceClient()

    # Initialize request argument(s)
    request = documentai.GetProcessorVersionRequest(
        name=processor_name,
    )

    # Make the request
    response = client.get_processor_version(request=request)

    # Handle the response
    return response

In [None]:
def get_f1score(operation, uptrained_version):
    """
    Function to get the F1 score of the newly trained model.
    """
    op_response = poll_operation(operation.name, location)
    processor_version_info = get_processor_version_info(uptrained_version)
    f1_score = processor_version_info.latest_evaluation.aggregate_metrics.f1_score
    return f1_score

In [None]:
f1_score = get_f1score(uptrained_version)
f1_score

## Deploy trained processor

Once the model is trained, you can deploy it to use in the document processing workflow based on the F1 score criteria. If the newly trained model meets our desired accuracy (F1 score), you can deploy the model.

In [None]:
def deploy_processor_version(processor_version):
    """
    Function to deploy the processor version.
    """
    # Create a client
    client = documentai.DocumentProcessorServiceClient()

    # Initialize request argument(s)
    request = documentai.DeployProcessorVersionRequest(
        name=processor_version,
    )

    # Make the request
    operation = client.deploy_processor_version(request=request)

    print("Waiting for operation to complete...")

    return operation

In [None]:
# Set threhsold for F1 score
threshold = 0.8  # @param {type:"number"}

# Deploy if F1 score of newly trained model is greater than threshold
if f1_score >= threshold:
    operation = deploy_processor_version(uptrained_version)
    op_name = operation.operation.name
    op_response = poll_operation(op_name, location)
    print("Processor is deployed")
else:
    print("The F1 score is below threshold")

## Document Processing

Once the model is deployed, you can use it in the document processing workflow by using the provided sample [code](https://cloud.google.com/document-ai/docs/process-documents-client-libraries#client-libraries-usage-python).