# Document Level Accuracy

* Author: docai-incubator@google.com

## Disclaimer

This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI Incubator Team**. No guarantees of performance are implied. 

## Objective
This tool uses annotated docs (JSON files) from GCS bucket as input and then runs the same (image) files through the designated version of the processor. Comparison of Annotated json files and processed json files should be provided  in a CSV file with difference and Document level accuracy stats. 

## Step by Step procedure 

### 1. Install and import the required libraries

In [None]:
!pip install pandas numpy google-cloud-storage google-cloud-documentai==2.16.0 PyPDF2 configparser pillow
!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py

In [50]:
import pandas as pd
import operator
import difflib
import json
import os
import pandas as pd
import time
import numpy as np
from google.cloud import storage
from google.cloud import documentai_v1beta3
from PIL import Image
from typing import (
    Container,
    Iterable,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Union,
)
from PyPDF2 import PdfFileReader
import configparser
import warnings
import ast
import io
import re
import traceback
import datetime
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import utilities

### 2. Setup the reuired Input Details

In [51]:
project_id = "your-project-id"
location = "your-location"
processor_id = "your-processor-id"
processor_version = "your-processor-version"
groundtruth_bucket_uri = "gs://your-bucket-uri"
critical_entities = ["entity1", "entity2", "entity3", "entity4", "entity5"]

Enter the input details with necessary information as outlined below:

- `project_id`: Provide the project ID of your Google Cloud project.
- `groundtruth_bucket_uri`: Provide the Google Cloud Storage (GCS) path of the annotated JSON files.
- `critical_entities`: Provide a list of critical entities for which you require document level accuracy.
  - Example: `['invoice_id','invoice_date','receiver_name','receiver_address','supplier_name']`
- `processor_id`: Provide the processor ID of your Document AI processor.
- `processor_version_ID`: Provide the processor version ID.
- `Location`: Specify the location (e.g., 'us' or 'eu') where your processor is created.

Note: If the critical_entities parameter is provided as an empty list then the tool will compare all the entities.

In [52]:
def f1_calculator(merged_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Calculates F1-Score and other related metrics for entities in a dataframe.

    Args:
    merged_df (pd.DataFrame): A pandas DataFrame containing the merged data.

    Returns:
    Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing two DataFrames.
        The first DataFrame contains the calculated metrics.
        The second DataFrame is the original input DataFrame.
    """
    metrics_list = []

    # Function to calculate metrics
    def calculate_metrics(subset_df, entity_name):
        TP_Count = len(subset_df[subset_df["match"] == "TP"])
        FP_Count = len(subset_df[subset_df["match"] == "FP"])
        FN_Count = len(subset_df[subset_df["match"] == "FN"])
        TN_Count = len(subset_df[subset_df["match"] == "TN"])
        metrics = {
            "entity_name": entity_name,
            "Accuracy": round(
                (TP_Count + TN_Count) / (TP_Count + FP_Count + FN_Count + TN_Count), 2
            )
            if TP_Count
            else 0,
            "Precision": round(TP_Count / (TP_Count + FP_Count), 2) if TP_Count else 0,
            "Recall": round(TP_Count / (TP_Count + FN_Count), 2) if TP_Count else 0,
            "F1-Score": round(2 * TP_Count / (2 * TP_Count + FP_Count + FN_Count), 2)
            if TP_Count
            else 0,
        }
        return metrics

    # Calculate metrics for all entities
    all_entities_metrics = calculate_metrics(merged_df, "All Entities")
    metrics_list.append(all_entities_metrics)

    # Calculate metrics for each unique entity
    for entity in merged_df["entity_name"].unique():
        entity_df = merged_df[merged_df["entity_name"] == entity]
        entity_metrics = calculate_metrics(entity_df, entity)
        metrics_list.append(entity_metrics)

    # Convert list of dictionaries to DataFrame
    All_metrics = pd.DataFrame(metrics_list)
    return All_metrics, merged_df


def doc_proto_to_dataframe(data: documentai_v1beta3.Document) -> pd.DataFrame:
    """It will convert Document Proto object to DataFrame. Returns entities in dataframe format

    Args:
        data (documentai_v1beta3.Document): It is Document Proto Object

    Returns:
        pd.DataFrame: It is a DataFrame which having all entities data as rows
    """

    df = pd.DataFrame(columns=["type_", "mention_text", "bbox"])
    if not data.entities:
        print("No entities Found")
        return df
    for entity in data.entities:
        if entity.properties:
            for sub_entity in entity.properties:
                df = add_entity_to_dataframe(sub_entity, df)
            continue
        df = add_entity_to_dataframe(entity, df)
    return df


def add_entity_to_dataframe(
    entity: documentai_v1beta3.Document.Entity, df: pd.DataFrame
) -> pd.DataFrame:
    """It will append entity data to given DataFrame

    Args:
        entity (documentai_v1beta3.Document.Entity): An entity from Document Object
        df (pd.DataFrame): Target Dataframe to add an entity as new row

    Returns:
        pd.DataFrame: It is a Dataframe with newly appended entity as row
    """

    if entity.mention_text:
        coord1, _, coord3, _ = entity.page_anchor.page_refs[
            0
        ].bounding_poly.normalized_vertices
        bbox = [coord1.x, coord1.y, coord3.x, coord3.y]
        df.loc[len(df.index)] = [entity.type_, entity.mention_text, bbox]
    else:
        df.loc[len(df.index)] = [entity.type_, "Entity not found.", []]
    return df


def compare_doc_proto_convert_dataframe(
    file1: documentai_v1beta3.Document, file2: documentai_v1beta3.Document
) -> Tuple[pd.DataFrame, np.float64]:
    """Compares the entities between two files and returns the results in a dataframe

    Args:
        file1 (documentai_v1beta3.Document): It is Document Proto Object
        file2 (documentai_v1beta3.Document): It is also Document Proto Object to compare with previous

    Returns:
        Tuple[pd.DataFrame, np.float64]: It returns Dataframe and matched score
                                            between two Document Protos
    """

    df_file1 = doc_proto_to_dataframe(file1)
    df_file2 = doc_proto_to_dataframe(file2)
    file1_entities = [entity[0] for entity in df_file1.values]
    file2_entities = [entity[0] for entity in df_file2.values]

    # find entities which are present only once in both files
    # these entities will be matched directly
    common_entities = set(file1_entities).intersection(set(file2_entities))
    exclude_entities = []
    for entity in common_entities:
        if file1_entities.count(entity) > 1 or file2_entities.count(entity) > 1:
            exclude_entities.append(entity)
    for entity in exclude_entities:
        common_entities.remove(entity)
    df_compare = pd.DataFrame(
        columns=["entity_name", "initial_prediction", "current_prediction"]
    )
    for entity in common_entities:
        value1 = df_file1[df_file1["type_"] == entity].iloc[0]["mention_text"]
        value2 = df_file2[df_file2["type_"] == entity].iloc[0]["mention_text"]
        df_compare.loc[len(df_compare.index)] = [entity, value1, value2]
        # common entities are removed from df_file1 and df_file2
        df_file1 = utilities.remove_row(df_file1, entity)
        df_file2 = utilities.remove_row(df_file2, entity)

    # remaining entities are matched comparing the area of IOU across them
    mention_text2 = pd.Series(dtype=str)
    for index, row in enumerate(df_file1.values):
        matched_index = utilities.find_match(row, df_file2)
        if matched_index is not None:
            mention_text2.loc[index] = df_file2.loc[matched_index][1]
            df_file2 = df_file2.drop(matched_index)
        else:
            mention_text2.loc[index] = "Entity not found."

    df_file1["mention_text2"] = mention_text2.values
    df_file1 = df_file1.drop(["bbox"], axis=1)
    df_file1.rename(
        columns={
            "type_": "entity_name",
            "mention_text": "initial_prediction",
            "mention_text2": "current_prediction",
        },
        inplace=True,
    )
    df_compare = pd.concat([df_compare, df_file1], ignore_index=True)

    # adding entities which are present in file2 but not in file1
    for row in df_file2.values:
        df_compare.loc[len(df_compare.index)] = [row[0], "Entity not found.", row[1]]

    df_compare["match"] = (
        df_compare["initial_prediction"] == df_compare["current_prediction"]
    )
    df_compare["fuzzy ratio"] = df_compare.apply(utilities.get_match_ratio, axis=1)
    if list(df_compare.index):
        score = df_compare["fuzzy ratio"].sum() / len(df_compare.index)
    else:
        score = 0
    return df_compare, score


def classify_row(row: pd.Series) -> str:
    """
    Classifies a row into categories based on the comparison of 'initial_prediction'
    and 'current_prediction' values.

    Args:
    row (pd.Series): A row from a pandas DataFrame, expected to contain
                     'initial_prediction' and 'current_prediction' columns.

    Returns:
    str: The classification result, which can be 'TN', 'FN', 'FP', 'TP', or an error message.
    """
    if (
        row["initial_prediction"] == "Entity not found."
        and row["current_prediction"] == "Entity not found."
    ):
        return "TN"
    elif (
        row["initial_prediction"] != "Entity not found."
        and row["current_prediction"] == "Entity not found."
    ):
        return "FN"
    elif (
        row["initial_prediction"] == "Entity not found."
        and row["current_prediction"] != "Entity not found."
    ):
        return "FP"
    elif (
        row["initial_prediction"] != "Entity not found."
        and row["current_prediction"] != "Entity not found."
    ):
        if row["initial_prediction"] == row["current_prediction"]:
            return "TP"
        else:
            return "FP"
    else:
        return "Something went Wrong."

In [None]:
try:
    storage_client = storage.Client()

    # Current time for unique bucket names
    time_stamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")

    print("Creating temporary buckets")
    groundtruth_bucket_name = "groundtruth-vb_temp_" + time_stamp
    parsed_output_bucket_name = "processedoutput-vb_temp_" + time_stamp

    # Extract the ground truth bucket name from the URI
    ground_truth_bucket = groundtruth_bucket_uri.split("/")[2]

    # Create temporary buckets
    utilities.check_create_bucket(groundtruth_bucket_name)
    utilities.check_create_bucket(parsed_output_bucket_name)
    warnings.simplefilter(action="ignore", category=FutureWarning)
    try:
        ground_truth_files, ground_truth_dict = utilities.file_names(
            groundtruth_bucket_uri
        )
        print("Copying files to temporary bucket")
        for file_name in ground_truth_files:
            utilities.copy_blob(
                ground_truth_bucket,
                ground_truth_dict[file_name],
                groundtruth_bucket_name,
                file_name,
            )

        # List files in the new bucket
        files_list = [
            blob.name
            for blob in storage_client.bucket(groundtruth_bucket_name).list_blobs()
        ]
    except Exception as e:
        print("Unable to process files due to: ", e)

    for file_name in files_list:
        print(groundtruth_bucket_name, file_name)
        input_path_json = utilities.blob_downloader(groundtruth_bucket_name, file_name)
        pdf_bytes, synthesized_images = utilities.create_pdf_bytes_from_json(
            input_path_json
        )

        try:
            res = utilities.process_document_sample(
                project_id, location, processor_id, pdf_bytes, processor_version
            )
            document_json = documentai_v1beta3.Document.to_json(res.document).encode(
                "utf-8"
            )
            blob = storage_client.bucket(parsed_output_bucket_name).blob(file_name)
            blob.upload_from_string(document_json, content_type="application/json")
        except Exception as e:
            print(f"Unable to process file {file_name} due to: ", e)

    (
        relation_dict,
        relation_non_matched_files_dict,
    ) = utilities.matching_files_two_buckets(
        groundtruth_bucket_name, parsed_output_bucket_name
    )
    # print(relation_dict)
    test_harness_merged = pd.DataFrame()
    accuracy_docs = []
    print("comparing the Annotated Jsons and Processed jsons ....Wait for Summary ")
    for i in relation_dict:
        groundtruth_json = utilities.blob_downloader(groundtruth_bucket_name, i)
        parsed_output_json = utilities.blob_downloader(
            parsed_output_bucket_name, relation_dict[i]
        )
        # test_harness_output = compare_groundtruth_and_output(groundtruth_json, parsed_output_json)[0]

        groundtruth_json_string = json.dumps(groundtruth_json)
        parsed_json_string = json.dumps(parsed_output_json)

        groundtruth_json_proto = documentai_v1beta3.Document.from_json(
            groundtruth_json_string
        )
        parsed_output_json_proto = documentai_v1beta3.Document.from_json(
            parsed_json_string
        )

        test_harness_output = compare_doc_proto_convert_dataframe(
            groundtruth_json_proto, parsed_output_json_proto
        )[0]

        test_harness_output["match"] = test_harness_output.apply(classify_row, axis=1)

        # Save to CSV
        # test_harness_output.to_csv("test_harness_output.csv", index=False)
        column = [relation_dict[i]] * test_harness_output.shape[0]
        # print(column)
        test_harness_output.insert(loc=0, column="File Name", value=column)
        Document_accuracy = ""
        dict_files = {}
        if len(critical_entities) > 0:
            for j in critical_entities:
                try:
                    if (
                        test_harness_output[test_harness_output["entity_name"] == j][
                            "match"
                        ]
                        .value_counts()
                        .FP
                        > 0
                    ):
                        Document_accuracy = "NO"
                        break
                except AttributeError:
                    try:
                        if (
                            test_harness_output[
                                test_harness_output["entity_name"] == j
                            ]["match"]
                            .value_counts()
                            .FN
                            > 0
                        ):
                            Document_accuracy = "NO"
                            break
                    except AttributeError:
                        Document_accuracy = "YES"
        else:
            try:
                if test_harness_output["match"].value_counts().FP > 0:
                    Document_accuracy = "NO"
                    break
            except AttributeError:
                try:
                    if test_harness_output["match"].value_counts().FN > 0:
                        Document_accuracy = "NO"
                        break
                except AttributeError:
                    Document_accuracy = "YES"
        # print(Document_accuracy)

        dict_files[i] = Document_accuracy
        accuracy_docs.append(dict_files)
        frames = [test_harness_merged, test_harness_output]
        test_harness_merged = pd.concat(frames)
    try:
        utilities.bucket_delete(groundtruth_bucket_name)
        utilities.bucket_delete(parsed_output_bucket_name)
    except:
        pass

    output = f1_calculator(test_harness_merged)[0]
    Match_YES = 0
    Match_NO = 0
    try:
        Match_YES = test_harness_merged["fuzzy ratio"].value_counts().YES
        print("*******************SUMMARY**************************")
        print("NO OF DOCUMENTS HAVE 100% DOCUMENT ACCURACY =", Match_YES)
    except:
        print("NO OF DOCUMENTS HAVE 100% DOCUMENT ACCURACY =", Match_YES)
    try:
        Match_NO = test_harness_merged["fuzzy ratio"].value_counts().NO
        print("NO OF DOCUMENTS DOESNT HAVE 100% DOCUMENT ACCURACY =", Match_NO)
    except:
        print("NO OF DOCUMENTS DOESNT HAVE 100% DOCUMENT ACCURACY =", Match_NO)

    rejected_docs = []
    for i in range(len(accuracy_docs)):
        for j in accuracy_docs[i]:
            if accuracy_docs[i][j] == "NO":
                rejected_docs.append(j)

    print("\n")
    print(
        "LIST OF DOCUMENTS WHICH DOESNT HAVE 100% DOCUMENT ACCURACY\n",
        rejected_docs,
        "\n",
    )
    print("***********FOR DETAILS SEE THE CSV FILE CREATED******************")

    df = pd.DataFrame()
    for i in range(len(critical_entities)):
        df1 = test_harness_merged[
            test_harness_merged["entity_name"] == (critical_entities[i])
        ]
        df = pd.concat([df, df1])
    df2 = test_harness_merged[test_harness_merged["fuzzy ratio"] == "YES"]
    df3 = test_harness_merged[test_harness_merged["fuzzy ratio"] == "NO"]
    df = pd.concat([df, df2, df3])
    df = df.sort_values(by=["File Name"])
    df = df.reset_index(drop=True)
    df = df.to_csv("Document_Level_Accuracy.csv")

except Exception as e:
    try:
        utilities.bucket_delete(groundtruth_bucket_name)
        utilities.bucket_delete(parsed_output_bucket_name)
    except Exception as inner_e:
        print("Error during bucket deletion:", inner_e)
        traceback.print_exc()

    print("Unable to process the file:", e)
    traceback.print_exc()

### **Output**

The CSV file should have all the details of mismatch as shown below with Document level accuracy in ‘YES’ or ‘NO’.
In the comparison, if all the entities in annotated jsons and processed json are matching 100%  then the Document level accuracy is shown as YES else NO.

<td><img src="./images/output.png" width=800 height=400></td>