incubator-tools/signature_detection/signature_detection.ipynb (523 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "d9ad8788-654c-4e38-9d6d-42158d95c6ee", "metadata": {}, "source": [ "# Signature Detection" ] }, { "cell_type": "markdown", "id": "08400e10-6639-43a6-8e2a-0b4f28ba08e2", "metadata": {}, "source": [ "* Author: docai-incubator@google.com" ] }, { "cell_type": "markdown", "id": "8cc2d117-9e2a-4ebe-8aef-94ad4f2b46b1", "metadata": {}, "source": [ "## Disclaimer\n", "\n", "This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI Incubator Team**. No guarantees of performance are implied." ] }, { "cell_type": "markdown", "id": "94f1d1c0-823d-46d4-9a4a-e5ffa56c6b55", "metadata": {}, "source": [ "## Objective\n", "\n", "This document provides a step-by-step guide to help you extract the signature field descriptions from a PDF file and save them as entities in a JSON file." ] }, { "cell_type": "markdown", "id": "ffe00a75-567e-4f33-b4b4-aa72f9f4fe50", "metadata": {}, "source": [ "## Prerequisites\n", "* Python : Jupyter Notebook (Vertex).\n", "* Storage Bucket.\n", "* OCR Processor." ] }, { "cell_type": "markdown", "id": "9b33b7f0-baf8-421c-b476-2d25b1ee9b02", "metadata": {}, "source": [ "## Step by Step Procedure" ] }, { "cell_type": "markdown", "id": "eadfe192-5691-4053-8670-32ef3e5bea8c", "metadata": {}, "source": [ "### 1. Import Modules/Packages" ] }, { "cell_type": "code", "execution_count": null, "id": "ea066b24-5e3b-4c28-a56f-263fc6e9aa67", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Run this cell to download utilities module\n", "!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py" ] }, { "cell_type": "code", "execution_count": null, "id": "2d8e2a54-e250-4924-8f58-9a4642e6ddf3", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install google-cloud-documentai google-cloud-storage" ] }, { "cell_type": "code", "execution_count": null, "id": "6495b161-ac1b-40b2-a668-fe835da2950a", "metadata": { "tags": [] }, "outputs": [], "source": [ "import io\n", "from io import BytesIO\n", "from typing import Dict\n", "import os\n", "import base64\n", "from PIL import Image\n", "import numpy as np\n", "import json\n", "import cv2\n", "import PIL\n", "from google.cloud import documentai_v1beta3 as documentai\n", "from typing import Any, Dict, List, Optional, Sequence, Tuple, Union\n", "from pathlib import Path\n", "from google.cloud import storage\n", "import re\n", "from IPython.display import display\n", "\n", "from utilities import (\n", " batch_process_documents_sample,\n", " file_names,\n", " documentai_json_proto_downloader,\n", " bbox_maker,\n", ")" ] }, { "cell_type": "markdown", "id": "1709ea5e-bf14-44ab-9963-cda14b092dc2", "metadata": {}, "source": [ "### 2. Input Details" ] }, { "cell_type": "markdown", "id": "7ce030a1-663b-445e-a38e-24dc415942c2", "metadata": {}, "source": [ "* **project_id**: GCP project-id\n", "* **location**: Provide the location of processor created (`us` or `eu`)\n", "* **processor_id**: Provide OCR Processor Id\n", "* **input_gcs_path**: Provide the gcs path of the parent folder where the sub-folders contain input files. Please follow the folder structure described earlier.\n", "* **output_gcs_path**: Provide gcs path where the output json files have to be saved\n", "* **output_updated_gcs_path**: Provide gcs path where the updated output json files have to be saved\n" ] }, { "cell_type": "code", "execution_count": null, "id": "36eb0a45-7e29-41b4-a9e0-44dab8f55c17", "metadata": {}, "outputs": [], "source": [ "project_id = \"<<project_id>>\"\n", "location = \"<<location>>\"\n", "processor_id = \"<<processor_id>>\"\n", "input_gcs_path = \"gs://<<bucket_path>>/<<pdf_files>>/\"\n", "output_gcs_path = \"gs://<<bucket_path>>/<<output_ocr_files>>/\"\n", "output_updated_gcs_path = (\n", " \"gs://<<bucket_path>>/<<final_output_files>>/\" # Contains Signature description\n", ")" ] }, { "cell_type": "markdown", "id": "b4a4b1ed-2e86-4a8e-b00a-ebf6e41921ef", "metadata": {}, "source": [ "### 3. Run the required functions" ] }, { "cell_type": "code", "execution_count": null, "id": "4b2afdc0-3873-4db9-a7b7-e64cf0dce00b", "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_token_xy(token: Any) -> Tuple[float, float, float, float]:\n", " \"\"\"\n", " Extracts the normalized bounding box coordinates (min_x, min_y, max_x, max_y) of a token.\n", "\n", " Args:\n", " token (Any): A token object with layout information.\n", "\n", " Returns:\n", " Tuple[float, float, float, float]: The normalized bounding box coordinates.\n", " \"\"\"\n", " vertices = token.layout.bounding_poly.normalized_vertices\n", " minx_token, miny_token = min(point.x for point in vertices), min(\n", " point.y for point in vertices\n", " )\n", " maxx_token, maxy_token = max(point.x for point in vertices), max(\n", " point.y for point in vertices\n", " )\n", " return minx_token, miny_token, maxx_token, maxy_token\n", "\n", "\n", "def get_token_data(\n", " json_dict: Any,\n", " min_x: float,\n", " max_x: float,\n", " min_y: float,\n", " max_y: float,\n", " page_num: int,\n", ") -> Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]:\n", " \"\"\"\n", " Extracts token data from the JSON dictionary based on provided bounding box coordinates and page number.\n", "\n", " Args:\n", " json_dict (Any): The JSON dictionary containing token data.\n", " min_x (float): Minimum x-coordinate of the bounding box.\n", " max_x (float): Maximum x-coordinate of the bounding box.\n", " min_y (float): Minimum y-coordinate of the bounding box.\n", " max_y (float): Maximum y-coordinate of the bounding box.\n", " page_num (int): Page number.\n", "\n", " Returns:\n", " Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]: A tuple containing:\n", " 1. The extracted text from the tokens.\n", " 2. A list of dictionaries containing text anchor data for each token.\n", " 3. A list of dictionaries containing page anchor data.\n", " \"\"\"\n", " text_anc_temp = []\n", " text_anc = []\n", " page_anc_temp = {\"x\": [], \"y\": []}\n", " y_allowance = 0.005\n", " x_allowance = 0.02\n", "\n", " for page in json_dict.pages:\n", " if page_num == page.page_number - 1:\n", " for token in page.tokens:\n", " minx_token, miny_token, maxx_token, maxy_token = get_token_xy(token)\n", " if (\n", " min_y <= miny_token + y_allowance\n", " and max_y >= maxy_token - y_allowance\n", " and min_x <= minx_token + x_allowance\n", " and max_x >= maxx_token - x_allowance\n", " ):\n", " temp_anc = token.layout.text_anchor.text_segments[0]\n", " text_anc.append(temp_anc)\n", " page_anc_temp[\"x\"].extend([minx_token, maxx_token])\n", " page_anc_temp[\"y\"].extend([miny_token, maxy_token])\n", " for seg in token.layout.text_anchor.text_segments:\n", " text_anc_temp.append([seg.start_index, seg.end_index])\n", "\n", " page_anc = []\n", " if page_anc_temp != {\"x\": [], \"y\": []}:\n", " page_anc = [\n", " {\"x\": min(page_anc_temp[\"x\"]), \"y\": min(page_anc_temp[\"y\"])},\n", " {\"x\": max(page_anc_temp[\"x\"]), \"y\": min(page_anc_temp[\"y\"])},\n", " {\"x\": min(page_anc_temp[\"x\"]), \"y\": max(page_anc_temp[\"y\"])},\n", " {\"x\": max(page_anc_temp[\"x\"]), \"y\": max(page_anc_temp[\"y\"])},\n", " ]\n", "\n", " mention_text = \"\"\n", " if text_anc_temp:\n", " sorted_data = sorted(text_anc_temp, key=lambda x: x[0])\n", " for start_index, end_index in sorted_data:\n", " mention_text += json_dict.text[start_index:end_index]\n", "\n", " return mention_text, text_anc, page_anc\n", "\n", "\n", "def create_new_entity(\n", " token_data: Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]],\n", " entity_type: str,\n", " page_number: int,\n", ") -> Dict[str, Any]:\n", " \"\"\"\n", " Creates a new entity dictionary based on token data, entity type, and page number.\n", "\n", " Args:\n", " token_data (Tuple[str, List[Dict[str, Any]], List[Dict[str, float]]]): Token data including text, text anchors, and bounding box coordinates.\n", " entity_type (str): The type of the entity.\n", " page_number (int): The page number of the entity.\n", "\n", " Returns:\n", " Dict[str, Any]: The new entity dictionary.\n", " \"\"\"\n", " textsegments = [\n", " {\"endIndex\": seg.end_index, \"startIndex\": seg.start_index}\n", " for seg in token_data[1]\n", " ]\n", " new_entity = {\n", " \"mentionText\": token_data[0],\n", " \"pageAnchor\": {\n", " \"pageRefs\": [\n", " {\n", " \"boundingPoly\": {\"normalizedVertices\": token_data[2]},\n", " \"layoutType\": \"VISUAL_ELEMENT\",\n", " \"page\": str(page_number),\n", " }\n", " ]\n", " },\n", " \"textAnchor\": {\"content\": token_data[0], \"textSegments\": textsegments},\n", " \"type\": entity_type,\n", " }\n", " return new_entity\n", "\n", "\n", "def signatureDetection(\n", " json_data: Dict[str, Any],\n", " normalizedVertices: List[Dict[str, float]],\n", " pageNumber: int,\n", " blankLinePixelCount: int = 375,\n", " signatureThresholdPixelCount: int = 375,\n", ") -> bool:\n", " \"\"\"\n", " Detects if a signature exists within a specified bounding box in an image.\n", "\n", " Args:\n", " json_data (Dict[str, Any]): The JSON data containing image information.\n", " normalizedVertices (List[Dict[str, float]]): The normalized vertices of the bounding box.\n", " pageNumber (int): The page number of the document.\n", " blankLinePixelCount (int): Minimum pixel count to distinguish blank areas.\n", " signatureThresholdPixelCount (int): Threshold pixel count for detecting signatures.\n", "\n", " Returns:\n", " bool: True if a signature is detected, False otherwise.\n", " \"\"\"\n", " bounding_box = normalizedVertices\n", "\n", " img_height = json_data[\"pages\"][pageNumber][\"image\"][\"height\"]\n", " img_width = json_data[\"pages\"][pageNumber][\"image\"][\"width\"]\n", "\n", " x = [v[\"x\"] for v in bounding_box]\n", " y = [v[\"y\"] for v in bounding_box]\n", "\n", " left = min(x) * img_width - 1\n", " top = min(y) * img_height - 3\n", " right = max(x) * img_width + 5\n", " bottom = max(y) * img_height\n", "\n", " boundingBoxCoordinates = (left, top, right, bottom)\n", "\n", " content = base64.b64decode(json_data[\"pages\"][pageNumber][\"image\"][\"content\"])\n", " image = Image.open(io.BytesIO(content))\n", "\n", " cropped_image = image.crop(boundingBoxCoordinates)\n", " cropped_image.save(\"cropped.jpeg\")\n", "\n", " cropped_img = cv2.imread(\"cropped.jpeg\", 2)\n", " cropped_bw_image = cv2.threshold(cropped_img, 127, 255, cv2.THRESH_BINARY)\n", "\n", " pixel_value, occurrence = np.unique(cropped_bw_image[1], return_counts=True)\n", " pixel_counts = dict(zip(pixel_value, occurrence))\n", " cropped_black_pixel = int(pixel_counts.get(0, 0))\n", "\n", " os.remove(\"cropped.jpeg\")\n", "\n", " return (cropped_black_pixel > blankLinePixelCount) and (\n", " cropped_black_pixel > signatureThresholdPixelCount\n", " )" ] }, { "cell_type": "markdown", "id": "ac1d85cf-7529-469b-be7e-3e00a778af87", "metadata": {}, "source": [ "### 4. Run the code" ] }, { "cell_type": "code", "execution_count": null, "id": "49dab42f-6c98-4aac-b804-cd3ec81127f8", "metadata": { "tags": [] }, "outputs": [], "source": [ "if __name__ == \"__main__\":\n", " res = batch_process_documents_sample(\n", " project_id=project_id,\n", " location=location,\n", " processor_id=processor_id,\n", " gcs_input_uri=input_gcs_path,\n", " gcs_output_uri=output_gcs_path,\n", " )\n", " print(\"Batch Process Completed\")\n", " print(\"Signature Detection InProgess\")\n", " file_names_list, file_path_dict = file_names(output_gcs_path)\n", " for files in file_path_dict:\n", " json_proto_data = documentai_json_proto_downloader(\n", " output_gcs_path.split(\"/\")[2], file_path_dict[files]\n", " )\n", "\n", " # Pattern to match the word \"(initial)\" ignoring case\n", " pattern = r\"(Initial)\"\n", " # Using re.finditer to find all matches along with their indices\n", " matches = [\n", " {\"Start\": match.start(), \"End\": match.end()}\n", " for match in re.finditer(pattern, json_proto_data.text)\n", " ]\n", "\n", " intial_entities = []\n", " json_data = documentai.Document.to_dict(json_proto_data)\n", " new_entities = []\n", " for match in matches:\n", " token_text_anc = []\n", " x_ver = []\n", " y_ver = []\n", " page_num = \"\"\n", " for page in json_proto_data.pages:\n", " for token in page.tokens:\n", " token_seg = token.layout.text_anchor.text_segments\n", " norm_ver = token.layout.bounding_poly.normalized_vertices\n", " for seg in token_seg:\n", " if (\n", " seg.start_index >= match[\"Start\"] - 1\n", " and seg.end_index <= match[\"End\"] + 2\n", " ):\n", " token_text_anc.append(\n", " {\n", " \"start_index\": seg.start_index,\n", " \"end_index\": seg.end_index,\n", " }\n", " )\n", " for ver in norm_ver:\n", " x_ver.append(ver.x)\n", " y_ver.append(ver.y)\n", " page_num = page.page_number - 1\n", "\n", " nor_ver = [\n", " {\"x\": min(x_ver), \"y\": min(y_ver)},\n", " {\"x\": max(x_ver), \"y\": min(y_ver)},\n", " {\"x\": min(x_ver), \"y\": max(y_ver)},\n", " {\"x\": max(x_ver), \"y\": max(y_ver)},\n", " ]\n", " nv_box = bbox_maker(nor_ver)\n", " updated_nv = [\n", " {\"x\": nv_box[0] - 0.01, \"y\": nv_box[1] - 0.02},\n", " {\"x\": nv_box[0] - 0.01, \"y\": nv_box[1]},\n", " {\"x\": nv_box[2] + 0.01, \"y\": nv_box[1] - 0.02},\n", " {\"x\": nv_box[2] + 0.01, \"y\": nv_box[1]},\n", " ]\n", "\n", " if signatureDetection(json_data, updated_nv, page_num) == True:\n", " updated_nv_box = bbox_maker(updated_nv)\n", " nv_box = bbox_maker(nor_ver)\n", " min_y = min(updated_nv_box[1], nv_box[1])\n", " max_y = max(updated_nv_box[3], nv_box[3])\n", " min_x = max(updated_nv_box[2], nv_box[2]) + 0.01\n", " max_x = 1\n", " token_data = get_token_data(\n", " json_proto_data, min_x, max_x, min_y, max_y, page_num\n", " )\n", " new_entities.append(\n", " create_new_entity(token_data, \"initial_label\", page_num)\n", " )\n", " if \"entities\" in json_data.keys():\n", " json_data[\"entities\"].extend(new_entities)\n", " else:\n", " json_data[\"entities\"] = new_entities\n", " store_document_as_json(\n", " json.dumps(json_data),\n", " output_updated_gcs_path.split(\"/\")[2],\n", " \"/\".join(output_updated_gcs_path.split(\"/\")[3:]) + files,\n", " )\n", " print(\"Signature Detection Completed and Updated JSON.\")" ] }, { "cell_type": "markdown", "id": "16e3f538-c1ab-4ff1-a720-80d07ea9245b", "metadata": {}, "source": [ "### Output Details" ] }, { "cell_type": "markdown", "id": "b4badf9b-1b40-4b6a-9ac9-b7a0f233018f", "metadata": { "tags": [] }, "source": [ "### Before Detection\n", "<img src='./images/before_detection.png' width=400 height=600 alt=\"Sample Output\"></img>\n", "### After Detection\n", "<img src='./images/after_detection.png' width=400 height=600 alt=\"Sample Output\"></img>" ] }, { "cell_type": "code", "execution_count": null, "id": "40c13d25-50de-4c31-ad8b-13eb022b0303", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "conda-base-py", "name": "workbench-notebooks.m127", "type": "gcloud", "uri": "us-docker.pkg.dev/deeplearning-platform-release/gcr.io/workbench-notebooks:m127" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "conda-base-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.16" } }, "nbformat": 4, "nbformat_minor": 5 }