# Date Entities Annotation Tool


* Author: docai-incubator@google.com

## Disclaimer

This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the DocAI Incubator Team. No guarantees of performance are implied.

## Objective

This tool helps you annotate or label date entities in a table as line items. The names of date entities are tagged based on the header of the line items table. You can modify the header text to suit your needs and annotate the date values as line items. The notebook script expects the input file containing the invoice parser json output. The dates gets annotated if they are present in the OCR , and the headers are also recognized by the OCR. The result is an output json file with the labeled date entities as line items and exported to a storage bucket path. These result json files can be imported into a processor to further check the annotations and can be used for training.

## Prerequisites

* Vertex AI Notebook
* Input Json Files
* GCS bucket for processing of  the input documents and writing the output.


## Step by Step procedure

### 1.Importing required modules

In [None]:
# Download incubator-tools utilities module to present-working-directory
!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py

In [None]:
!pip install google-cloud-storage google-cloud-documentai tqdm -q

In [None]:
import utilities
from google.cloud import storage
from google.cloud import documentai_v1beta3 as documentai
from pprint import pprint
import traceback
import json
from tqdm import tqdm

### 2.Input and Output Path
* **project_id**: This contains the project ID of the project.
* **gcs_input_path**: This contains the storage bucket path of the input files.
* **gcs_output_path**: This contains the storage bucket path of the output files.
* **headers_entities**: This contains the table's header as the key and the associated entity name as its value 

In [None]:
# input details
project_id = "xx-xx-xx"
gcs_input_path = "gs://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/"
gcs_output_path = "gs://xxxxxxxxxxxxxxxxxxxxxxxxxxxxx/"

headers_entities = {
    "From": "line_item/date_from",
    "To": "line_item/date_to",
    "DESCRIPTION": "line_item/purchase_date",
    "Date": "line_item/purchase_date",
}
unique_entities = []
for value in headers_entities.values():
    if value not in unique_entities:
        unique_entities.append(value)

For instance, in the header_entities, you should supply the table's header as the key and the associated entity name as its value, as illustrated below. This ensures that all the data under that specific header will be labeled with the relevant entity name.

headers_entities={'From':'line_item/date_from'}

<img src="./images/image_1.png" width=800 height=400></img>

### 3.Run the Code

In [None]:
def get_token(json_dict: object, page: str, text_anchors_check: list):
    """THIS FUNCTION USED LOADED JSON, PAGE NUMBER AND TEXT ANCHORS AS INPUT AND GIVES THE X AND Y COORDINATES

     Args:
         json_dict (object) : The document object containing entities.
         page (str) : The page number as a string where these entities are found.
         text_anchors_check (list) : The list contains text anchors information which need to be checked.

    Returns:
         A tuple with three elements : A dictionary with keys 'min_x', 'min_y', 'max_x', and 'max_y' ; list containing textanchors ; confidence

    """

    min_x = ""
    for token in json_dict.pages[page].tokens:
        if token.layout.text_anchor.text_segments == text_anchors_check:
            normalized_vertices = token.layout.bounding_poly
            min_x = min(vertex.x for vertex in normalized_vertices.normalized_vertices)
            min_y = min(vertex.y for vertex in normalized_vertices.normalized_vertices)
            max_x = max(vertex.x for vertex in normalized_vertices.normalized_vertices)
            max_y = max(vertex.y for vertex in normalized_vertices.normalized_vertices)
            text_anc_token = token.layout.text_anchor.text_segments
            confidence = token.layout.confidence
    if min_x == "":
        for token in json_dict.pages[page].tokens:
            if not token.layout.text_anchor.text_segments[0].start_index:
                token.layout.text_anchor.text_segments[0].start_index = "0"
            if (
                abs(
                    int(token.layout.text_anchor.text_segments[0].start_index)
                    - int(text_anchors_check[0]["startIndex"])
                )
                <= 2
                and abs(
                    int(token.layout.text_anchor.text_segments[0].end_index)
                    - int(text_anchors_check[0]["endIndex"])
                )
                <= 2
            ):
                normalized_vertices = token.layout.bounding_poly
                min_x = min(
                    vertex.x for vertex in normalized_vertices.normalized_vertices
                )
                min_y = min(
                    vertex.y for vertex in normalized_vertices.normalized_vertices
                )
                max_x = max(
                    vertex.x for vertex in normalized_vertices.normalized_vertices
                )
                max_y = max(
                    vertex.y for vertex in normalized_vertices.normalized_vertices
                )
                text_anc_token = token.layout.text_anchor.text_segments
                confidence = token.layout.confidence
    return (
        {"min_x": min_x, "min_y": min_y, "max_x": max_x, "max_y": max_y},
        text_anc_token,
        confidence,
    )


def get_page_wise_entities(json_dict: object) -> dict:
    """
    THIS FUNCTION GIVES THE ENTITIES SPEPERATED FROM EACH PAGE IN DICTIONARY FORMAT

    Args:
       json_dict (object) : This contains loaded document object.

    Returns:
            Returns a dictionary having a structure of {page: [entities]}.
    """

    entities_page = {}
    for entity in json_dict.entities:
        page = "0"
        try:
            if not entity.page_anchor.page_refs[0].page:
                page = entity.page_anchor.page_refs[0].page

            if page in entities_page.keys():
                entities_page[page].append(entity)
            else:
                entities_page[page] = [entity]
        except:
            pass
    return entities_page


def get_min_max_y_lineitem(json_dict: object, page: int, ent2: list):
    """
    Extracts minimum and maximum Y-coordinates for line items from a JSON dictionary.

    Args:
     json_dict (object): Documentobject containing the JSON structure.
     page (int): Integer representing the page number.
     ent2 (list): List of entities to be considered.

    Returns:
     min_y_line (float): Minimum Y-coordinate for the line items.
     max_y_line (float): Maximum Y-coordinate for the line items.
    """

    min_y_considered = 0
    max_y_considered = 0

    line_items_all = []
    for entity in ent2:
        if entity.properties and entity.type == "line_item":
            line_items_all.append(entity)
    if line_items_all != []:
        if len(line_items_all) > 1 or len(line_items_all[0].properties) > 2:
            min_y_line = 1
            max_y_line = 0
            min_y_child = 1
            min_y_child_Mt = ""
            entity_mentiontext = ""
            for line_item in line_items_all:
                norm_ver = line_item.page_anchor.page_refs[
                    0
                ].bounding_poly.normalized_vertices
                for ver in norm_ver:
                    min_y_temp = min(vertex.y for vertex in norm_ver)
                    max_y_temp = max(vertex.y for vertex in norm_ver)
                    if min_y_line > min_y_temp:
                        min_y_line = min_y_temp
                        entity_mentiontext = line_item.mention_text
                        for child_ent in line_item.properties:
                            norm_ver_child = child_ent.page_anchor.page_refs[
                                0
                            ].bounding_poly.normalized_vertices
                            for ver_child in norm_ver_child:
                                min_y_child_temp = min(
                                    vertex.y for vertex in norm_ver_child
                                )
                                if min_y_child > min_y_child_temp:
                                    min_y_child = min_y_child_temp
                                    try:
                                        min_y_child_Mt = child_ent.mention_text
                                    except:
                                        pass
                    if max_y_line < max_y_temp:
                        max_y_line = max_y_temp
                    else:
                        pass

    return min_y_line, max_y_line


def get_date_entities(json_dict: object, headers_entities: dict) -> object:
    """
    This function retrieves the dates from the document present in the line items.

    Args:
        json_dict (object): Documentobject containing the JSON structure.
        headers_entities (dict): Dictionary containing the headers to be annotated in the document.

    Returns:
       Returns a document object after being annotated in the document.
    """

    def get_dates(json_dict):
        import re

        date_pattern = r"\b(?:\d{1,2}\/\d{1,2}\/\d{2,4}|\d{1,2}-\d{1,2}-\d{2,4}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2} \d{4})\b"
        matches = list(re.finditer(date_pattern, json_dict.text))
        match_dates_dict = {}
        n = 0
        for match in matches:
            match_dates_dict[n] = {
                "match": match.group(),
                "textanc": [
                    {"startIndex": str(match.start()), "endIndex": str(match.end())}
                ],
            }
            n = n + 1
        return match_dates_dict

    def line_item_dates(
        json_dict, page, match_dates_dict, min_y_lineitem, max_y_lineitem
    ):
        final_dates_dict = {}
        m = 0

        for i, v in match_dates_dict.items():
            try:
                ver, text_anc_token, confidence = get_token(
                    json_dict, page, v["textanc"]
                )

                if (
                    ver["min_y"] >= min_y_lineitem - 0.02
                    and ver["max_y"] <= max_y_lineitem + 0.02
                ):
                    match_dates_dict[i]["ver"] = ver
                    final_dates_dict[m] = {
                        "date": v["match"],
                        "textanc": text_anc_token,
                        "normalizedver": ver,
                        "confidence": confidence,
                    }
                    m = m + 1
            except:
                continue
        return final_dates_dict

    def get_headers_dict(
        json_dict, page, headers_entities, min_y_lineitem, max_y_lineitem
    ):
        header_exist_dict = {}
        header_btw = {}
        list_headers = list(headers_entities.keys())
        for token in json_dict.pages[page].tokens:
            normalized_vertices = token.layout.bounding_poly
            try:
                min_x = min(
                    vertex.x for vertex in normalized_vertices.normalized_vertices
                )
                min_y = min(
                    vertex.y for vertex in normalized_vertices.normalized_vertices
                )
                max_x = max(
                    vertex.x for vertex in normalized_vertices.normalized_vertices
                )
                max_y = max(
                    vertex.y for vertex in normalized_vertices.normalized_vertices
                )
                if not token.layout.text_anchor.text_segments[0].start_index:
                    token.layout.text_anchor.text_segments[0].start_index = "0"
                start_1 = token.layout.text_anchor.text_segments[0].start_index
                end_1 = token.layout.text_anchor.text_segments[0].end_index
                if min_y <= min_y_lineitem and abs(min_y - min_y_lineitem) <= 0.1:
                    text_1 = (
                        json_dict.text[int(start_1) : int(end_1)]
                        .replace("\n", "")
                        .replace(" ", "")
                    )
                    temp_dict = {}
                    if text_1 in list_headers:
                        temp_dic_2 = {}
                        temp_dic_2[text_1] = {
                            "pageanc": {
                                "min_x": min_x,
                                "min_y": min_y,
                                "max_x": max_x,
                                "max_y": max_y,
                            },
                            "text_anc": {"startIndex": start_1, "endIndex": end_1},
                        }
                        header_exist_dict[b] = temp_dic_2
                        b += 1

            except:
                pass
            b = 0
            for token in json_dict.pages[page].tokens:
                normalized_vertices = token.layout.bounding_poly
                try:
                    min_x = min(
                        vertex.x for vertex in normalized_vertices.normalized_vertices
                    )
                    min_y = min(
                        vertex.y for vertex in normalized_vertices.normalized_vertices
                    )
                    max_x = max(
                        vertex.x for vertex in normalized_vertices.normalized_vertices
                    )
                    max_y = max(
                        vertex.y for vertex in normalized_vertices.normalized_vertices
                    )
                    if not token.layout.text_anchor.text_segments[0].start_index:
                        token.layout.text_anchor.text_segments[0].start_index = "0"
                    start_1 = token.layout.text_anchor.text_segments[0].start_index
                    end_1 = token.layout.text_anchor.text_segments[0].end_index
                    if min_y > min_y_lineitem and max_y < max_y_lineitem:
                        text_1 = (
                            json_dict.text[int(start_1) : int(end_1)]
                            .replace("\n", "")
                            .replace(" ", "")
                        )
                        temp_dict = {}
                        if text_1 in list_headers:
                            temp_dic_2 = {}
                            temp_dic_2[text_1] = {
                                "pageanc": {
                                    "min_x": min_x,
                                    "min_y": min_y,
                                    "max_x": max_x,
                                    "max_y": max_y,
                                },
                                "text_anc": {"startIndex": start_1, "endIndex": end_1},
                            }
                            header_exist_dict[b] = temp_dic_2
                            b += 1

                except:
                    pass
        return header_exist_dict

    def create_entities(final_match, page, json_dict):
        for i1, v1 in final_match.items():
            if (
                v1["considered_ent"] != ""
                and v1["considered_ent"] != {}
                and len(v1["considered_ent"]) != 0
            ):
                parent_entity = documentai.Document.Entity()
                new_entity = documentai.Document.Entity()
                new_entity.confidence = v1["date_ent"]["confidence"]
                new_entity.mention_text = v1["date_ent"]["date"]
                min_x = v1["date_ent"]["normalizedver"]["min_x"]
                min_y = v1["date_ent"]["normalizedver"]["min_y"]
                max_x = v1["date_ent"]["normalizedver"]["max_x"]
                max_y = v1["date_ent"]["normalizedver"]["max_y"]
                ver_1 = [
                    {"x": min_x, "y": min_y},
                    {"x": min_x, "y": max_y},
                    {"x": max_x, "y": min_y},
                    {"x": max_x, "y": max_y},
                ]
                page_ref_1 = new_entity.page_anchor.PageRef()
                page_ref_1.bounding_poly.normalized_vertices.extend(ver_1)
                new_entity.page_anchor.page_refs.append(page_ref_1)
                new_entity.text_anchor.text_segments = v1["date_ent"]["textanc"]
                new_entity.type = headers_entities[list(v1["considered_ent"].keys())[0]]
                parent_entity.properties.append(new_entity)
                parent_entity.confidence = new_entity.confidence
                parent_entity.mention_text = new_entity.mention_text
                parent_entity.page_anchor = new_entity.page_anchor
                parent_entity.text_anchor = new_entity.text_anchor
                parent_entity.type = headers_entities[
                    list(v1["considered_ent"].keys())[0]
                ].split("/")[0]
                json_dict.entities.append(parent_entity)
        return json_dict

    def final_match(dates_1, headers_1):
        temp_dic = {}
        final_match = {}
        q = 0
        duplicates = []
        for i, j in dates_1.items():
            diff = 1
            considered_ent = ""
            considered_ent_list = []
            if j not in duplicates:
                for k, l in headers_1.items():
                    for k2, l2 in l.items():
                        if (
                            abs(j["normalizedver"]["min_x"] - l2["pageanc"]["min_x"])
                            <= 0.05
                            and abs(
                                j["normalizedver"]["max_x"] - l2["pageanc"]["max_x"]
                            )
                            <= 0.1
                        ):
                            if (
                                j["normalizedver"]["max_y"] - l2["pageanc"]["min_y"]
                                >= 0
                            ):
                                if (
                                    diff
                                    > j["normalizedver"]["max_y"]
                                    - l2["pageanc"]["min_y"]
                                ):
                                    temp_dic = {}
                                    diff = (
                                        j["normalizedver"]["max_y"]
                                        - l2["pageanc"]["min_y"]
                                    )
                                    temp_dic[k2] = l2
            duplicates.append(j)
            considered_ent = temp_dic
            temp_dic = {}
            final_match[q] = {"date_ent": j, "considered_ent": considered_ent}
            q += 1

        return final_match

    date_entities_final = []

    try:
        page_wise_ent = get_page_wise_entities(json_dict)
    except Exception as e:
        print("page wise entities issue--> ", e)
        return json_dict

    for page, page_entities in page_wise_ent.items():
        try:
            min_y_lineitem, max_y_lineitem = get_min_max_y_lineitem(
                json_dict, int(page), page_entities
            )
        except Exception as e:
            print("COULDNT GET MIN Y AND MAX Y FOR LINE ITEMS--> ", e)
            continue

        try:
            match_dates_dict = get_dates(json_dict)
        except Exception as e:
            print("COULDNT FIND DATES IN THE TEXT--> ", e)
            continue

        try:
            final_dates_dict = line_item_dates(
                json_dict,
                int(page),
                match_dates_dict,
                min_y_lineitem,
                max_y_lineitem,
            )
        except Exception as e:
            print("NO DATES IN LINE ITEM RANGE--> ", e)
            continue

        try:
            header_exist_dict = get_headers_dict(
                json_dict,
                int(page),
                headers_entities,
                min_y_lineitem,
                max_y_lineitem,
            )
        except Exception as e:
            print("NO HEADERS FOUND MATCHING--> ", e)
            continue

        try:
            final_match = final_match(final_dates_dict, header_exist_dict)
        except Exception as e:
            print("Match not found-->", e)
            continue

        try:
            date_entities = create_entities(final_match, page, json_dict)
        except Exception as e:
            print("COULDNT CREATE ENTITIES--> ", e)

    return json_dict

In [None]:
def load_json_from_gcs(bucket_name, blob_name):
    """Load a JSON file from a Google Cloud Storage bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # Download the contents of the blob as a string and then parse it as JSON
    json_data = json.loads(blob.download_as_text())
    return json_data

In [None]:
file_names_list, file_dict = utilities.file_names(gcs_input_path)
for filename, filepath in tqdm(file_dict.items(), desc="Progress"):
    input_bucket_name = gcs_input_path.split("/")[2]
    print(input_bucket_name)
    if ".json" in filepath:
        filepath = "gs://" + input_bucket_name + "/" + filepath
        print(filepath)
        # json_dict = utilities.load_json(filepath)
        blob_name = "/".join(filepath.split("/")[3:])
        json_dict = load_json_from_gcs(input_bucket_name, blob_name)
        json_dict = documentai.Document.from_json(json.dumps(json_dict))
        json_updated = get_date_entities(json_dict, headers_entities)

        output_bucket_name = gcs_output_path.split("/")[2]
        output_path_within_bucket = "/".join(gcs_output_path.split("/")[3:]) + filename
        utilities.store_document_as_json(
            documentai.Document.to_json(json_updated),
            output_bucket_name,
            output_path_within_bucket,
        )

#### **Note**: After running this **date entities code** we need to run the **line item improver code** inorder to combine this date entities with other line items in the respective row.


### 4.Output after running line item improver code


<img src="./images/output_sample_1_2.png" width=800 height=400></img>
<img src="./images/output_sample_1_1.png" width=800 height=400></img>

### 5.Edge cases

The line item merging may not work as expected for some documents due to the layout. This is illustrated in the screenshot below.


<img src="./images/output_sample_2_2.png" width=800 height=400></img>
<img src="./images/output_sample_2_1.png" width=800 height=400></img>