# Signature Detection

* Author: docai-incubator@google.com

## Disclaimer

This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI Incubator Team**. No guarantees of performance are implied.

## Objective

This document provides a  step-by-step guide to help you extract the signature field descriptions from a PDF file and save them as entities in a JSON file.

## Prerequisites
* Python : Jupyter Notebook (Vertex).
* Storage Bucket.
* OCR Processor.

## Step by Step Procedure

### 1. Import Modules/Packages

In [None]:
# Run this cell to download utilities module
!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py

In [None]:
!pip install google-cloud-documentai google-cloud-storage

In [None]:
import io
from io import BytesIO
from typing import Dict
import os
import base64
from PIL import Image
import numpy as np
import json
import cv2
import PIL
from google.cloud import documentai_v1beta3 as documentai
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from pathlib import Path
from google.cloud import storage
import re
from IPython.display import display

from utilities import (
    batch_process_documents_sample,
    file_names,
    documentai_json_proto_downloader,
    bbox_maker,
)

### 2. Input Details

* **project_id**: GCP project-id
* **location**: Provide the location of processor created (`us` or `eu`)
* **processor_id**: Provide OCR Processor Id
* **input_gcs_path**: Provide the gcs path of the parent folder where the sub-folders contain input files. Please follow the folder structure described earlier.
* **output_gcs_path**: Provide gcs path where the output json files have to be saved
* **output_updated_gcs_path**:  Provide gcs path where the updated output json files have to be saved


In [None]:
project_id = "<<project_id>>"
location = "<<location>>"
processor_id = "<<processor_id>>"
input_gcs_path = "gs://<<bucket_path>>/<<pdf_files>>/"
output_gcs_path = "gs://<<bucket_path>>/<<output_ocr_files>>/"
output_updated_gcs_path = (
    "gs://<<bucket_path>>/<<final_output_files>>/"  # Contains Signature description
)

### 3. Run the required functions

In [None]:
def get_token_xy(token: Any) -> Tuple[float, float, float, float]:
    """
    Extracts the normalized bounding box coordinates (min_x, min_y, max_x, max_y) of a token.

    Args:
        token (Any): A token object with layout information.

    Returns:
        Tuple[float, float, float, float]: The normalized bounding box coordinates.
    """
    vertices = token.layout.bounding_poly.normalized_vertices
    minx_token, miny_token = min(point.x for point in vertices), min(
        point.y for point in vertices
    )
    maxx_token, maxy_token = max(point.x for point in vertices), max(
        point.y for point in vertices
    )
    return minx_token, miny_token, maxx_token, maxy_token


def get_token_data(
    json_dict: Any,
    min_x: float,
    max_x: float,
    min_y: float,
    max_y: float,
    page_num: int,
) -> Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]:
    """
    Extracts token data from the JSON dictionary based on provided bounding box coordinates and page number.

    Args:
        json_dict (Any): The JSON dictionary containing token data.
        min_x (float): Minimum x-coordinate of the bounding box.
        max_x (float): Maximum x-coordinate of the bounding box.
        min_y (float): Minimum y-coordinate of the bounding box.
        max_y (float): Maximum y-coordinate of the bounding box.
        page_num (int): Page number.

    Returns:
        Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]: A tuple containing:
            1. The extracted text from the tokens.
            2. A list of dictionaries containing text anchor data for each token.
            3. A list of dictionaries containing page anchor data.
    """
    text_anc_temp = []
    text_anc = []
    page_anc_temp = {"x": [], "y": []}
    y_allowance = 0.005
    x_allowance = 0.02

    for page in json_dict.pages:
        if page_num == page.page_number - 1:
            for token in page.tokens:
                minx_token, miny_token, maxx_token, maxy_token = get_token_xy(token)
                if (
                    min_y <= miny_token + y_allowance
                    and max_y >= maxy_token - y_allowance
                    and min_x <= minx_token + x_allowance
                    and max_x >= maxx_token - x_allowance
                ):
                    temp_anc = token.layout.text_anchor.text_segments[0]
                    text_anc.append(temp_anc)
                    page_anc_temp["x"].extend([minx_token, maxx_token])
                    page_anc_temp["y"].extend([miny_token, maxy_token])
                    for seg in token.layout.text_anchor.text_segments:
                        text_anc_temp.append([seg.start_index, seg.end_index])

    page_anc = []
    if page_anc_temp != {"x": [], "y": []}:
        page_anc = [
            {"x": min(page_anc_temp["x"]), "y": min(page_anc_temp["y"])},
            {"x": max(page_anc_temp["x"]), "y": min(page_anc_temp["y"])},
            {"x": min(page_anc_temp["x"]), "y": max(page_anc_temp["y"])},
            {"x": max(page_anc_temp["x"]), "y": max(page_anc_temp["y"])},
        ]

    mention_text = ""
    if text_anc_temp:
        sorted_data = sorted(text_anc_temp, key=lambda x: x[0])
        for start_index, end_index in sorted_data:
            mention_text += json_dict.text[start_index:end_index]

    return mention_text, text_anc, page_anc


def create_new_entity(
    token_data: Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]],
    entity_type: str,
    page_number: int,
) -> Dict[str, Any]:
    """
    Creates a new entity dictionary based on token data, entity type, and page number.

    Args:
        token_data (Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]): Token data including text, text anchors, and bounding box coordinates.
        entity_type (str): The type of the entity.
        page_number (int): The page number of the entity.

    Returns:
        Dict[str, Any]: The new entity dictionary.
    """
    textsegments = [
        {"endIndex": seg.end_index, "startIndex": seg.start_index}
        for seg in token_data[1]
    ]
    new_entity = {
        "mentionText": token_data[0],
        "pageAnchor": {
            "pageRefs": [
                {
                    "boundingPoly": {"normalizedVertices": token_data[2]},
                    "layoutType": "VISUAL_ELEMENT",
                    "page": str(page_number),
                }
            ]
        },
        "textAnchor": {"content": token_data[0], "textSegments": textsegments},
        "type": entity_type,
    }
    return new_entity


def signatureDetection(
    json_data: Dict[str, Any],
    normalizedVertices: List[Dict[str, float]],
    pageNumber: int,
    blankLinePixelCount: int = 375,
    signatureThresholdPixelCount: int = 375,
) -> bool:
    """
    Detects if a signature exists within a specified bounding box in an image.

    Args:
        json_data (Dict[str, Any]): The JSON data containing image information.
        normalizedVertices (List[Dict[str, float]]): The normalized vertices of the bounding box.
        pageNumber (int): The page number of the document.
        blankLinePixelCount (int): Minimum pixel count to distinguish blank areas.
        signatureThresholdPixelCount (int): Threshold pixel count for detecting signatures.

    Returns:
        bool: True if a signature is detected, False otherwise.
    """
    bounding_box = normalizedVertices

    img_height = json_data["pages"][pageNumber]["image"]["height"]
    img_width = json_data["pages"][pageNumber]["image"]["width"]

    x = [v["x"] for v in bounding_box]
    y = [v["y"] for v in bounding_box]

    left = min(x) * img_width - 1
    top = min(y) * img_height - 3
    right = max(x) * img_width + 5
    bottom = max(y) * img_height

    boundingBoxCoordinates = (left, top, right, bottom)

    content = base64.b64decode(json_data["pages"][pageNumber]["image"]["content"])
    image = Image.open(io.BytesIO(content))

    cropped_image = image.crop(boundingBoxCoordinates)
    cropped_image.save("cropped.jpeg")

    cropped_img = cv2.imread("cropped.jpeg", 2)
    cropped_bw_image = cv2.threshold(cropped_img, 127, 255, cv2.THRESH_BINARY)

    pixel_value, occurrence = np.unique(cropped_bw_image[1], return_counts=True)
    pixel_counts = dict(zip(pixel_value, occurrence))
    cropped_black_pixel = int(pixel_counts.get(0, 0))

    os.remove("cropped.jpeg")

    return (cropped_black_pixel > blankLinePixelCount) and (
        cropped_black_pixel > signatureThresholdPixelCount
    )

### 4. Run the code

In [None]:
if __name__ == "__main__":
    res = batch_process_documents_sample(
        project_id=project_id,
        location=location,
        processor_id=processor_id,
        gcs_input_uri=input_gcs_path,
        gcs_output_uri=output_gcs_path,
    )
    print("Batch Process Completed")
    print("Signature Detection InProgess")
    file_names_list, file_path_dict = file_names(output_gcs_path)
    for files in file_path_dict:
        json_proto_data = documentai_json_proto_downloader(
            output_gcs_path.split("/")[2], file_path_dict[files]
        )

        # Pattern to match the word "(initial)" ignoring case
        pattern = r"(Initial)"
        # Using re.finditer to find all matches along with their indices
        matches = [
            {"Start": match.start(), "End": match.end()}
            for match in re.finditer(pattern, json_proto_data.text)
        ]

        intial_entities = []
        json_data = documentai.Document.to_dict(json_proto_data)
        new_entities = []
        for match in matches:
            token_text_anc = []
            x_ver = []
            y_ver = []
            page_num = ""
            for page in json_proto_data.pages:
                for token in page.tokens:
                    token_seg = token.layout.text_anchor.text_segments
                    norm_ver = token.layout.bounding_poly.normalized_vertices
                    for seg in token_seg:
                        if (
                            seg.start_index >= match["Start"] - 1
                            and seg.end_index <= match["End"] + 2
                        ):
                            token_text_anc.append(
                                {
                                    "start_index": seg.start_index,
                                    "end_index": seg.end_index,
                                }
                            )
                            for ver in norm_ver:
                                x_ver.append(ver.x)
                                y_ver.append(ver.y)
                                page_num = page.page_number - 1

            nor_ver = [
                {"x": min(x_ver), "y": min(y_ver)},
                {"x": max(x_ver), "y": min(y_ver)},
                {"x": min(x_ver), "y": max(y_ver)},
                {"x": max(x_ver), "y": max(y_ver)},
            ]
            nv_box = bbox_maker(nor_ver)
            updated_nv = [
                {"x": nv_box[0] - 0.01, "y": nv_box[1] - 0.02},
                {"x": nv_box[0] - 0.01, "y": nv_box[1]},
                {"x": nv_box[2] + 0.01, "y": nv_box[1] - 0.02},
                {"x": nv_box[2] + 0.01, "y": nv_box[1]},
            ]

            if signatureDetection(json_data, updated_nv, page_num) == True:
                updated_nv_box = bbox_maker(updated_nv)
                nv_box = bbox_maker(nor_ver)
                min_y = min(updated_nv_box[1], nv_box[1])
                max_y = max(updated_nv_box[3], nv_box[3])
                min_x = max(updated_nv_box[2], nv_box[2]) + 0.01
                max_x = 1
                token_data = get_token_data(
                    json_proto_data, min_x, max_x, min_y, max_y, page_num
                )
                new_entities.append(
                    create_new_entity(token_data, "initial_label", page_num)
                )
        if "entities" in json_data.keys():
            json_data["entities"].extend(new_entities)
        else:
            json_data["entities"] = new_entities
        store_document_as_json(
            json.dumps(json_data),
            output_updated_gcs_path.split("/")[2],
            "/".join(output_updated_gcs_path.split("/")[3:]) + files,
        )
    print("Signature Detection Completed and Updated JSON.")

### Output Details

### Before Detection
<img src='./images/before_detection.png' width=400 height=600 alt="Sample Output"></img>
### After Detection
<img src='./images/after_detection.png' width=400 height=600 alt="Sample Output"></img>