incubator-tools/reverse_annotation_tool/reverse_annotation_tool.ipynb (955 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "29399724",
"metadata": {},
"source": [
"# Reverse Annotation Tool"
]
},
{
"cell_type": "markdown",
"id": "1e4a6def",
"metadata": {},
"source": [
"* Author: docai-incubator@google.com"
]
},
{
"cell_type": "markdown",
"id": "6029ebef",
"metadata": {},
"source": [
"## Disclaimer\n",
"\n",
"This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI Incubator Team**. No guarantees of performance are implied."
]
},
{
"cell_type": "markdown",
"id": "18bbfedd",
"metadata": {},
"source": [
"# Objective\n",
"This tool helps in annotating or labeling the entities in the document based on the ocr text tokens. The notebook script expects the input file containing the name of entities in tabular format. And the first row is the header representing the entities that need to be labeled in every document. The script calls the processor and parses each of these input documents. The parsed document is then annotated if input entities are present in the document based on the OCR text tokens. The result is an output json file with updated entities and exported into a storage bucket path. This result json files can be imported into a processor to further check the annotations are existing as per the input file which was provided to the script prior the execution."
]
},
{
"cell_type": "markdown",
"id": "4a77d9e9",
"metadata": {},
"source": [
"# Prerequisites\n",
"* Vertex AI Notebook\n",
"* Input csv file containing list of files to be labeled.\n",
"* Document AI Processor\n",
"* GCS bucket for processing of the input documents and writing the output."
]
},
{
"cell_type": "markdown",
"id": "cad788f5",
"metadata": {},
"source": [
"# Step-by-Step Procedure"
]
},
{
"cell_type": "markdown",
"id": "3e511f7a",
"metadata": {},
"source": [
"## 1. Import Modules/Packages"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "80ecc19e-d0fb-4435-9284-44318db1937c",
"metadata": {},
"outputs": [],
"source": [
"!pip install google-cloud-documentai\n",
"!pip install google-cloud-storage\n",
"!pip install numpy\n",
"!pip install pandas\n",
"!pip install fuzzywuzzy"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "10e90cfe",
"metadata": {},
"outputs": [],
"source": [
"# Run this cell to download utilities module\n",
"!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2d4b7289",
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"import re\n",
"from typing import Dict, List, Tuple, Union\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from fuzzywuzzy import fuzz\n",
"from google.cloud import documentai_v1beta3 as documentai\n",
"from google.cloud import storage\n",
"\n",
"from utilities import process_document_sample, store_document_as_json"
]
},
{
"cell_type": "markdown",
"id": "4aa4caea",
"metadata": {},
"source": [
"## 2. Input Details"
]
},
{
"cell_type": "markdown",
"id": "717bb8cc",
"metadata": {},
"source": [
"* **PROJECT_ID** : GCP project Id\n",
"* **LOCATION** : Location of DocumentAI processor, either `us` or `eu`\n",
"* **PROCESSOR_ID** : DocumentAI processor Id\n",
"* **PROCESSOR_VERSION** : DocumentAI processor verrsion Id(eg- pretrained-invoice-v2.0-2023-12-06)\n",
"* **INPUT_BUCKET** : It is input GCS folder path which contains pdf files\n",
"* **OUTPUT_BUCKET** : It is a GCS folder path to store post-processing results\n",
"* **READ_SCHEMA_FILENAME** : It is a csv file contains entities(type & mention_text) data, which are needed to be annotated. In csv, Column-1(FileNames) contains file names , Column-2(entity_type) contains data to be annotated, Column-3 and its following fields should follow same field-schema as Column-2. In otherwords it is a schema file containing a tabular data with header row as name of the entities that needs to be identified and annotated in the document and the following rows are for each file whose values needs to be extracted. \n",
"\n",
" <img src='./images/csv_sample.png' width=800 height=400>"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "bfdd838b",
"metadata": {},
"outputs": [],
"source": [
"PROJECT_ID = \"xx-xx-xx\"\n",
"PROCESSOR_ID = \"xx-xx-xx\"\n",
"PROCESSOR_VERSION = \"pretrained-invoice-v2.0-2023-12-06\"\n",
"INPUT_BUCKET = \"gs://BUCKET_NAME/reverse_annotation_tool/input/\"\n",
"OUTPUT_BUCKET = \"gs://BUCKET_NAME/reverse_annotation_tool/output/\"\n",
"LOCATION = \"us\"\n",
"# Column headers based on your original CSV structure\n",
"READ_SCHEMA_FILENAME = \"schema_and_data.csv\""
]
},
{
"cell_type": "markdown",
"id": "3347e5d1",
"metadata": {},
"source": [
"## 3. Run Below Code-Cells"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "94606760",
"metadata": {},
"outputs": [],
"source": [
"def read_input_schema(read_schema_filename: str) -> pd.DataFrame:\n",
" \"\"\"\n",
" Reads an input schema from a CSV file.\n",
"\n",
" Args:\n",
" - read_schema_file_name (str): Path to the CSV file containing the schema.\n",
"\n",
" Returns:\n",
" - pd.DataFrame: DataFrame containing the schema data.\n",
" \"\"\"\n",
"\n",
" df = pd.read_csv(read_schema_filename, dtype=str)\n",
" df = df.drop(df[df[\"FileNames\"] == \"Type\"].index)\n",
" df.replace(\"\", np.nan, inplace=True)\n",
" return df\n",
"\n",
"\n",
"def get_token_range(json_data: documentai.Document) -> Dict[range, Dict[str, int]]:\n",
" \"\"\"\n",
" Gets the token ranges from the provided JSON data.\n",
"\n",
" Args:\n",
" - json_data (documentai.Document): JSON data containing page and token information.\n",
"\n",
" Returns:\n",
" - dict: Dictionary containing token ranges with page number and token number information.\n",
" \"\"\"\n",
"\n",
" token_range = {}\n",
" for pn, page in enumerate(json_data.pages):\n",
" for tn, token in enumerate(page.tokens):\n",
" ts = token.layout.text_anchor.text_segments[0]\n",
" start_index = ts.start_index\n",
" end_index = ts.end_index\n",
" token_range[range(start_index, end_index)] = {\n",
" \"page_number\": pn,\n",
" \"token_number\": tn,\n",
" }\n",
" return token_range\n",
"\n",
"\n",
"def fix_page_anchor_entity(\n",
" entity: documentai.Document.Entity,\n",
" json_data: documentai.Document,\n",
" token_range: Dict[range, Dict[str, int]],\n",
") -> documentai.Document.Entity:\n",
" \"\"\"\n",
" Fixes the page anchor entity based on the provided JSON data and token range.\n",
"\n",
" Args:\n",
" - entity (documentai.Document.Entity): Entity object to be fixed.\n",
" - json_data (documentai.Document): JSON data containing page and token information.\n",
" - token_range (Dict[range, Dict[str, int]]):\n",
" Dictionary containing token ranges with page number and token number information.\n",
"\n",
" Returns:\n",
" - documentai.Document.Entity: Fixed entity object.\n",
" \"\"\"\n",
"\n",
" start = entity.text_anchor.text_segments[0].start_index\n",
" end = entity.text_anchor.text_segments[0].end_index - 1\n",
"\n",
" for j in token_range:\n",
" if start in j:\n",
" lower_token = token_range[j]\n",
" for j in token_range:\n",
" if end in j:\n",
" upper_token = token_range[j]\n",
"\n",
" lower_token_data = (\n",
" json_data.pages[lower_token[\"page_number\"]]\n",
" .tokens[lower_token[\"token_number\"]]\n",
" .layout.bounding_poly.normalized_vertices\n",
" )\n",
" upper_token_data = (\n",
" json_data.pages[int(upper_token[\"page_number\"])]\n",
" .tokens[int(upper_token[\"token_number\"])]\n",
" .layout.bounding_poly.normalized_vertices\n",
" )\n",
"\n",
" def get_coords(\n",
" normalized_vertex: documentai.NormalizedVertex,\n",
" ) -> Tuple[float, float]:\n",
" return normalized_vertex.x, normalized_vertex.y\n",
"\n",
" xa, ya = get_coords(lower_token_data[0])\n",
" xa_, ya_ = get_coords(upper_token_data[0])\n",
"\n",
" xb, yb = get_coords(lower_token_data[1])\n",
" xb_, yb_ = get_coords(upper_token_data[1])\n",
"\n",
" xc, yc = get_coords(lower_token_data[2])\n",
" xc_, yc_ = get_coords(upper_token_data[2])\n",
"\n",
" xd, yd = get_coords(lower_token_data[3])\n",
" xd_, yd_ = get_coords(upper_token_data[3])\n",
"\n",
" cord1 = {\"x\": min(xa, xa_), \"y\": min(ya, ya_)}\n",
" cord2 = {\"x\": max(xb, xb_), \"y\": min(yb, yb_)}\n",
" cord3 = {\"x\": max(xc, xc_), \"y\": max(yc, yc_)}\n",
" cord4 = {\"x\": min(xd, xd_), \"y\": max(yd, yd_)}\n",
" nvs = []\n",
" for coords in [cord1, cord2, cord3, cord4]:\n",
" x, y = coords[\"x\"], coords[\"y\"]\n",
" nv = documentai.NormalizedVertex(x=x, y=y)\n",
" nvs.append(nv)\n",
" entity.page_anchor.page_refs[0].bounding_poly.normalized_vertices = nvs\n",
" entity.page_anchor.page_refs[0].page = lower_token[\"page_number\"]\n",
" return entity\n",
"\n",
"\n",
"def create_entity(\n",
" mention_text: str, type_: str, match: re.Match\n",
") -> documentai.Document.Entity:\n",
" \"\"\"\n",
" Creates a Document Entity based on the provided mention text, type, and match object.\n",
"\n",
" Args:\n",
" - mention_text (str): The text to be mentioned in the entity.\n",
" - type_ (str): The type of the entity.\n",
" - match (re.Match): Match object representing the start and end indices of the mention text.\n",
"\n",
" Returns:\n",
" - documentai.Document.Entity: The created Document Entity.\n",
" \"\"\"\n",
"\n",
" entity = documentai.Document.Entity()\n",
" entity.mention_text = mention_text\n",
" entity.type = type_\n",
" bp = documentai.BoundingPoly(normalized_vertices=[])\n",
" ts = documentai.Document.TextAnchor.TextSegment(\n",
" start_index=str(match.start()), end_index=str(match.end())\n",
" )\n",
" entity.text_anchor.text_segments = [ts]\n",
" entity.page_anchor.page_refs = [\n",
" documentai.Document.PageAnchor.PageRef(bounding_poly=bp)\n",
" ]\n",
"\n",
" return entity\n",
"\n",
"\n",
"# Line Items processing\n",
"def extract_anchors(prop: documentai.Document.Entity) -> Tuple[str, str]:\n",
" \"\"\"It will look for text anchors and page anchors in Entity object\n",
"\n",
" Args:\n",
" prop (documentai.Document.Entity): DocumentAI Entity object\n",
"\n",
" Returns:\n",
" Tuple[str, str]: It contains text_anchors and page_anchors in string-format\n",
" \"\"\"\n",
" text_anchor = f\"{prop.text_anchor.text_segments}\" if prop.text_anchor else \"MISSING\"\n",
" page_anchor = f\"{prop.page_anchor.page_refs[0]}\" if prop.page_anchor else \"MISSING\"\n",
" return text_anchor, page_anchor\n",
"\n",
"\n",
"def improved_similarity_score(str1: str, str2: str) -> float:\n",
" \"\"\"it return similarity/fuzzy ratio between two strings\n",
"\n",
" Args:\n",
" str1 (str): It is a text\n",
" str2 (str): It is also a text\n",
"\n",
" Returns:\n",
" float: similarity ration between string_1 and string_2\n",
" \"\"\"\n",
"\n",
" str1_parts = set(str1.split())\n",
" str2_parts = set(str2.split())\n",
" common_parts = str1_parts.intersection(str2_parts)\n",
" total_parts = str1_parts.union(str2_parts)\n",
" if not total_parts:\n",
" return 0.0\n",
" return len(common_parts) / len(total_parts)\n",
"\n",
"\n",
"def pair_items_with_improved_similarity(\n",
" gt_dict: Dict[str, Dict[str, str]], pred_dict: Dict[str, Dict[str, str]]\n",
") -> Dict[str, str]:\n",
" \"\"\"It pairs grounf_truth and prediction data based on similarity scrore between them\n",
"\n",
" Args:\n",
" gt_dict (Dict[str, Dict[str, str]]): A dictionary containing ground truth data\n",
" pred_dict (Dict[str, Dict[str, str]]): A dictionary containing predicton data\n",
"\n",
" Returns:\n",
" Dict[str, str]: It contains type & best matched mention text\n",
" \"\"\"\n",
" pairings = {}\n",
" for gt_key, gt_values in gt_dict.items():\n",
" gt_concat = \" \".join(gt_values.values()).lower()\n",
" best_match_key = None\n",
" best_score = -1\n",
" for pred_key, pred_values in pred_dict.items():\n",
" pred_values_only = {k: v[\"value\"] for k, v in pred_values.items()}\n",
" pred_concat = \" \".join(pred_values_only.values()).lower()\n",
" score = improved_similarity_score(gt_concat, pred_concat)\n",
" if score > best_score:\n",
" best_score = score\n",
" best_match_key = pred_key\n",
" pairings[gt_key] = best_match_key if best_score > 0 else None\n",
" return pairings\n",
"\n",
"\n",
"def process_documents(\n",
" csv_file_path: str, doc_obj: documentai.Document, file_name: str\n",
") -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, str]], Dict[str, str]]:\n",
" \"\"\"\n",
" It an a helper function to get line_items , grouped entities and paired entity type and\n",
" its best match against ground truth\n",
"\n",
" Args:\n",
" csv_file_path (str):\n",
" CSV file path, It contains text's which need to annotated in doc-proto object\n",
" doc_obj (documentai.Document): DocumentAI Doc proto object\n",
" file_name (str): _description_\n",
"\n",
" Returns:\n",
" Tuple[Dict[str, Dict[str, str]],Dict[str, Dict[str, str]],Dict[str, str]]:\n",
" it returns line_items , grouped entities and paired entity type and\n",
" its best match against ground truth\n",
" \"\"\"\n",
" line_items_dict = {}\n",
" entity_groups_dict = {}\n",
"\n",
" # Read and process the CSV file\n",
" with open(csv_file_path, mode=\"r\", newline=\"\") as csv_file:\n",
" csv_reader = csv.reader(csv_file)\n",
" headers = next(csv_reader)\n",
" for index, row in enumerate(csv_reader):\n",
" row_dict = dict(zip(headers, row))\n",
" if row_dict.get(\"FileNames\") == file_name:\n",
" line_item_details = {}\n",
" has_line_item_values = False\n",
" for header, value in row_dict.items():\n",
" if \"line_item/\" in header and value:\n",
" has_line_item_values = True\n",
" line_item_details[header] = value\n",
" if line_item_details and has_line_item_values:\n",
" line_items_dict[f\"gt_line_item_{index}\"] = line_item_details\n",
"\n",
" n = 1\n",
" for entity in doc_obj.entities:\n",
" if entity.properties:\n",
" entity_details = {}\n",
" for prop in entity.properties:\n",
" key = prop.type_\n",
" value = prop.mention_text\n",
" text_anchor, page_anchor = extract_anchors(prop)\n",
" entity_details[key] = {\n",
" \"value\": value,\n",
" \"text_anchor\": text_anchor,\n",
" \"page_anchor\": page_anchor,\n",
" }\n",
" entity_groups_dict[f\"pred_line_item_{n}\"] = entity_details\n",
" n += 1\n",
"\n",
" # Pair items using the improved similarity score\n",
" improved_pairings = pair_items_with_improved_similarity(\n",
" line_items_dict, entity_groups_dict\n",
" )\n",
"\n",
" return line_items_dict, entity_groups_dict, improved_pairings\n",
"\n",
"\n",
"def extract_bounding_box_and_page(layout_info: str) -> Dict[str, Union[int, float]]:\n",
" \"\"\"It is used to get xy-coords and its page number from page_anchor\n",
"\n",
" Args:\n",
" layout_info (str): DocumentAI token object page_anchor data in string format\n",
"\n",
" Returns:\n",
" Dict[str, Union[int, float]]: It contains page_number and xy-coords of token\n",
" \"\"\"\n",
" x_values = []\n",
" y_values = []\n",
" page = 0 # Default page number\n",
"\n",
" for line in layout_info.split(\"\\n\"):\n",
" if \"x:\" in line:\n",
" _, x_value = line.split(\":\")\n",
" x_values.append(float(x_value.strip()))\n",
" elif \"y:\" in line:\n",
" _, y_value = line.split(\":\")\n",
" y_values.append(float(y_value.strip()))\n",
" elif line.startswith(\"page:\"):\n",
" _, page = line.split(\":\")\n",
" page = int(page.strip())\n",
"\n",
" return {\n",
" \"page\": page,\n",
" \"min_x\": min(x_values),\n",
" \"max_x\": max(x_values),\n",
" \"min_y\": min(y_values),\n",
" \"max_y\": max(y_values),\n",
" }\n",
"\n",
"\n",
"def get_page_anc_line(line_dict_1: Dict[str, str]) -> Dict[str, Union[int, float]]:\n",
" \"\"\"It is used to get xy-coords and its page number from page_anchor\n",
"\n",
" Args:\n",
" line_dict_1 (Dict[str, str]): Dictionary which holds page_anchor data\n",
"\n",
" Returns:\n",
" Dict[str, Union[int, float]]:\n",
" It returns page_number and xy-coords of based on page_anchor object\n",
" \"\"\"\n",
"\n",
" val_s = []\n",
" for en1, val1 in line_dict_1.items():\n",
" page_anc_dict = extract_bounding_box_and_page(val1[\"page_anchor\"])\n",
" val_s.append(page_anc_dict)\n",
" page_line = {\n",
" \"page\": val_s[0][\"page\"],\n",
" \"min_x\": min(entry[\"min_x\"] for entry in val_s),\n",
" \"max_x\": max(entry[\"max_x\"] for entry in val_s),\n",
" \"min_y\": min(entry[\"min_y\"] for entry in val_s),\n",
" \"max_y\": max(entry[\"max_y\"] for entry in val_s),\n",
" }\n",
"\n",
" return page_line\n",
"\n",
"\n",
"def get_cleaned_text(text: str) -> str:\n",
" \"\"\"it removes spaces & newline characters from provided text\n",
"\n",
" Args:\n",
" text (str): A text which need to be cleaned\n",
"\n",
" Returns:\n",
" str: text without containing spaces & newline chars\n",
" \"\"\"\n",
" return text.lower().replace(\" \", \"\").replace(\"\\n\", \"\")\n",
"\n",
"\n",
"def get_match(gt_mt_split: List[str], mt_temp: str) -> Union[List[str], None]:\n",
" \"\"\"It returns best match of mention text from ground truth text\n",
"\n",
" Args:\n",
" gt_mt_split (List[str]): It contains list of strings\n",
" mt_temp (str): It is text, which need to be checked against gt_mt_split for best match\n",
"\n",
" Returns:\n",
" Union[List[str], None]: It returns best match of mention text from ground truth text\n",
" \"\"\"\n",
" flag_found = False\n",
" for mt in gt_mt_split:\n",
" if fuzz.ratio(get_cleaned_text(mt_temp), get_cleaned_text(mt)) > 75:\n",
" gt_mt_split.remove(mt)\n",
" flag_found = True\n",
" return gt_mt_split\n",
" if flag_found == False:\n",
" return None\n",
"\n",
"\n",
"def get_new_entity(\n",
" doc_obj: documentai.Document,\n",
" page_anc_dict: Dict[str, Union[int, float]],\n",
" gt_mt: str,\n",
" type_en: str,\n",
") -> Union[documentai.Document.Entity, None]:\n",
" \"\"\"It creates new entity based on provided page_anchor, mention text and entity type\n",
"\n",
" Args:\n",
" doc_obj (documentai.Document): DocumentAI Doc proto object\n",
" page_anc_dict (Dict[str, Union[int, float]]):\n",
" It contains page_number and xy-coords of based on page_anchor object\n",
" gt_mt (str): text which uses as mention_text for entity object\n",
" type_en (str): text which uses as type_ for an entity object\n",
"\n",
" Returns:\n",
" Union[documentai.Document.Entity, None]:\n",
" It creates new entity based on provided page_anchor, mention text and entity type\n",
" \"\"\"\n",
" text_anc = []\n",
" page_anc = {\"x\": [], \"y\": []}\n",
" mt_text = \"\"\n",
" gt_mt_split = gt_mt.split()\n",
" for page_num, _ in enumerate(doc_obj.pages):\n",
" if page_num == int(page_anc_dict[\"page\"]):\n",
" for token in doc_obj.pages[page_num].tokens:\n",
" vertices = token.layout.bounding_poly.normalized_vertices\n",
" minx_token, miny_token = min(point.x for point in vertices), min(\n",
" point.y for point in vertices\n",
" )\n",
" maxx_token, maxy_token = max(point.x for point in vertices), max(\n",
" point.y for point in vertices\n",
" )\n",
" token_seg = token.layout.text_anchor.text_segments\n",
" for seg in token_seg:\n",
" token_start, token_end = seg.start_index, seg.end_index\n",
" if (\n",
" abs(miny_token - page_anc_dict[\"min_y\"]) <= 0.02\n",
" and abs(maxy_token - page_anc_dict[\"max_y\"]) <= 0.02\n",
" ):\n",
" mt_temp = doc_obj.text[token_start:token_end]\n",
"\n",
" if (\n",
" get_cleaned_text(mt_temp) in gt_mt.lower().replace(\" \", \"\")\n",
" or fuzz.ratio(\n",
" get_cleaned_text(mt_temp), gt_mt.lower().replace(\" \", \"\")\n",
" )\n",
" > 70\n",
" ):\n",
" if len(mt_temp) <= 2:\n",
" if (\n",
" fuzz.ratio(\n",
" mt_temp.lower().replace(\" \", \"\").replace(\"\\n\", \"\"),\n",
" gt_mt.lower().replace(\" \", \"\"),\n",
" )\n",
" > 80\n",
" ):\n",
" ts = documentai.Document.TextAnchor.TextSegment(\n",
" start_index=token_start, end_index=token_end\n",
" )\n",
" text_anc.append(ts)\n",
" page_anc[\"x\"].extend([minx_token, maxx_token])\n",
" page_anc[\"y\"].extend([miny_token, maxy_token])\n",
" mt_text += mt_temp\n",
" else:\n",
" ts = documentai.Document.TextAnchor.TextSegment(\n",
" start_index=token_start, end_index=token_end\n",
" )\n",
" text_anc.append(ts)\n",
" page_anc[\"x\"].extend([minx_token, maxx_token])\n",
" page_anc[\"y\"].extend([miny_token, maxy_token])\n",
" mt_text += mt_temp\n",
" else:\n",
" match_mt = get_match(gt_mt_split, mt_temp)\n",
" if match_mt:\n",
" ts = documentai.Document.TextAnchor.TextSegment(\n",
" start_index=token_start, end_index=token_end\n",
" )\n",
" text_anc.append(ts)\n",
" page_anc[\"x\"].extend([minx_token, maxx_token])\n",
" page_anc[\"y\"].extend([miny_token, maxy_token])\n",
" mt_text += mt_temp\n",
"\n",
" try:\n",
" x, y = page_anc.values()\n",
" page_anc_new = [\n",
" {\"x\": min(x), \"y\": min(y)},\n",
" {\"x\": max(x), \"y\": min(y)},\n",
" {\"x\": max(x), \"y\": max(y)},\n",
" {\"x\": min(x), \"y\": max(y)},\n",
" ]\n",
" nvs = []\n",
" for xy in page_anc_new:\n",
" nv = documentai.NormalizedVertex(**xy)\n",
" nvs.append(nv)\n",
" new_entity = documentai.Document.Entity()\n",
" new_entity.mention_text = mt_text\n",
" new_entity.type_ = type_en\n",
" ta = documentai.Document.TextAnchor(content=mt_text, text_segments=text_anc)\n",
" new_entity.text_anchor = ta\n",
" bp = documentai.BoundingPoly(normalized_vertices=nvs)\n",
" page_ref = documentai.Document.PageAnchor.PageRef(\n",
" page=str(page_anc_dict[\"page\"]), bounding_poly=bp\n",
" )\n",
" new_entity.page_anchor.page_refs = [page_ref]\n",
" return new_entity\n",
" except ValueError:\n",
" return None\n",
"\n",
"\n",
"def parse_page_anchor(page_anchor_str: str) -> documentai.Document.PageAnchor:\n",
" \"\"\"It creates page_anchor proto-object based on provided page_anchor text\n",
"\n",
" Args:\n",
" page_anchor_str (str): page anchor data in string format\n",
"\n",
" Returns:\n",
" documentai.Document.PageAnchor:\n",
" newly created page_anchor proto-object based on provided page_anchor text\n",
" \"\"\"\n",
" # Extract normalized vertices using the provided reference approach\n",
" vertices = []\n",
" page = \"0\" # Default to page 0 if not specified\n",
" lines = page_anchor_str.split(\"\\n\")\n",
" for idx, line in enumerate(lines):\n",
" if \"x:\" in line:\n",
" x = float(line.split(\":\")[1].strip())\n",
" # Ensure there's a corresponding 'y' line following 'x' line\n",
" if idx + 1 < len(lines) and \"y:\" in lines[idx + 1]:\n",
" y = float(lines[idx + 1].split(\":\")[1].strip())\n",
" nv = documentai.NormalizedVertex(x=x, y=y)\n",
" vertices.append(nv)\n",
" elif line.startswith(\"page:\"):\n",
" page = line.split(\":\")[1].strip()\n",
"\n",
" bp = documentai.BoundingPoly(normalized_vertices=vertices)\n",
" page_ref = documentai.Document.PageAnchor.PageRef(page=page, bounding_poly=bp)\n",
" page_anchor = documentai.Document.PageAnchor(page_refs=[page_ref])\n",
" return page_anchor\n",
"\n",
"\n",
"def parse_text_anchor(\n",
" text_anchor_str: str, content_str: str\n",
") -> documentai.Document.TextAnchor:\n",
" \"\"\"\n",
" It builds DocAI text Anchor object based on provided text anchor and content in string format\n",
"\n",
" Args:\n",
" text_anchor_str (str): DocAI text anchor object in string format\n",
" content_str (str): text to add to text anchot object\n",
"\n",
" Returns:\n",
" documentai.Document.TextAnchor:\n",
" Text Anchor object created based on provided text anchor and content in string format\n",
" \"\"\"\n",
"\n",
" # Simplified parsing for 'text_segments' from 'text_anchor'\n",
" segments_matches = re.findall(\n",
" r\"start_index: (\\d+)\\nend_index: (\\d+)\", text_anchor_str\n",
" )\n",
" text_segments = []\n",
" for si, ei in segments_matches:\n",
" ts = documentai.Document.TextAnchor.TextSegment(start_index=si, end_index=ei)\n",
" text_segments.append(ts)\n",
"\n",
" ta = documentai.Document.TextAnchor(\n",
" text_segments=text_segments, content=content_str\n",
" )\n",
" return ta\n",
"\n",
"\n",
"def construct_predicted_value_details(\n",
" predicted_value: Dict[str, str], type_: str\n",
") -> documentai.Document.Entity:\n",
" \"\"\"It will create new entinty object in doc-proto format\n",
"\n",
" Args:\n",
" predicted_value (Dict[str, str]):\n",
" it is dictionary wich contains text_anchor, page_anchor and value as string-object\n",
" type_ (str): text which needs to be assigned to entity.type_\n",
"\n",
" Returns:\n",
" documentai.Document.Entity: It returnds new entity object\n",
" \"\"\"\n",
" page_anchor = parse_page_anchor(predicted_value[\"page_anchor\"])\n",
" text_anchor = parse_text_anchor(\n",
" predicted_value[\"text_anchor\"], predicted_value.get(\"value\", \"\")\n",
" )\n",
" ent = documentai.Document.Entity()\n",
" ent.mention_text = predicted_value.get(\"value\", \"\")\n",
" ent.page_anchor = page_anchor\n",
" ent.text_anchor = text_anchor\n",
" ent.type_ = type_\n",
" return ent\n",
"\n",
"\n",
"df_schema = read_input_schema(READ_SCHEMA_FILENAME)\n",
"# Group by 'FileNames'\n",
"grouped = df_schema.groupby(\"FileNames\", as_index=False)\n",
"processed_rows = []\n",
"\n",
"for name, group in grouped:\n",
" # Get the total number of columns\n",
" max_columns = len(group.columns)\n",
" combined_row = []\n",
"\n",
" # Iterate over rows in the group\n",
" for index, row in group.iterrows():\n",
" row_list = row.tolist()\n",
" row_filled = row_list + [np.nan] * (\n",
" max_columns - len(row_list)\n",
" ) # Extend with NaNs if less than max_columns\n",
" combined_row.extend(row_filled)\n",
"\n",
" processed_rows.append(combined_row)\n",
"\n",
"\n",
"headers = [\n",
" header.strip()\n",
" for header in pd.read_csv(READ_SCHEMA_FILENAME, nrows=0).columns.tolist()\n",
"]\n",
"prefix = \"line_item/\"\n",
"\n",
"# Extract the part after 'line_item/' for each item that starts with the prefix\n",
"unique_entities = [item.split(\"/\")[-1] for item in headers if item.startswith(prefix)]\n",
"\n",
"processed_files = set() # Set to keep track of processed FileNames\n",
"\n",
"\n",
"client = storage.Client()\n",
"bucket = client.get_bucket(INPUT_BUCKET.split(\"/\")[2])\n",
"\n",
"for row in processed_rows:\n",
" file_name = row[0] # The first item is 'FileNames'\n",
"\n",
" if file_name not in processed_files:\n",
" print(\"Processing:\", file_name)\n",
" file_name_path = INPUT_BUCKET + file_name\n",
" file_name_path = \"/\".join(file_name_path.split(\"/\")[3:])\n",
" blob = bucket.blob(file_name_path)\n",
" content = blob.download_as_bytes()\n",
" res = utilities.process_document_sample(\n",
" project_id=PROJECT_ID,\n",
" location=LOCATION,\n",
" processor_id=PROCESSOR_ID,\n",
" pdf_bytes=content,\n",
" processor_version=PROCESSOR_VERSION,\n",
" )\n",
" res_dict = res.document\n",
" token_range = get_token_range(res_dict)\n",
"\n",
" # Add the file_name to the set of processed files\n",
" processed_files.add(file_name)\n",
"\n",
" parser_entities = res_dict.entities\n",
"\n",
" # Process the line items\n",
" line_items_dict, entity_groups_dict, improved_pairings = process_documents(\n",
" READ_SCHEMA_FILENAME, res_dict, file_name\n",
" )\n",
"\n",
" entities = [] # Initialize the list to hold all entities\n",
"\n",
" for match_key, match_value in improved_pairings.items():\n",
" if match_value:\n",
" entity = documentai.Document.Entity()\n",
" mention_texts = [] # To hold all mention_text values for concatenation\n",
"\n",
" for gt_k, gt_v in line_items_dict[match_key].items():\n",
" is_correct = True\n",
" if gt_k in entity_groups_dict[match_value].keys():\n",
" predicted_value = entity_groups_dict[match_value][gt_k]\n",
" similarity = fuzz.ratio(gt_v, predicted_value[\"value\"])\n",
" if similarity < 90:\n",
" is_correct = False\n",
" else:\n",
" predicted_value_details = construct_predicted_value_details(\n",
" predicted_value, gt_k\n",
" )\n",
" entity.type_ = gt_k.split(\"/\")[\n",
" 0\n",
" ] # Assuming 'line_item' is the desired type\n",
" entity.properties.append(predicted_value_details)\n",
" mention_texts.append(predicted_value_details.mention_text)\n",
" else:\n",
" is_correct = False\n",
"\n",
" if not is_correct:\n",
" page_line = get_page_anc_line(entity_groups_dict[match_value])\n",
" new_ent = get_new_entity(res_dict, page_line, gt_v, gt_k)\n",
" if new_ent is not None:\n",
" entity.type_ = gt_k.split(\"/\")[\n",
" 0\n",
" ] # Assuming 'line_item' is the desired type\n",
" entity.properties.append(new_ent)\n",
" mention_texts.append(new_ent.mention_text)\n",
" else:\n",
" predicted_value_details = construct_predicted_value_details(\n",
" predicted_value, gt_k\n",
" )\n",
" entity.properties.append(predicted_value_details)\n",
" mention_texts.append(predicted_value_details.mention_text)\n",
"\n",
" # Concatenate all mention_texts for the parent entity\n",
" entity.mention_text = \" \".join(mention_texts).strip()\n",
"\n",
" # Add the entity to the list if it has been populated with properties\n",
" if entity.properties:\n",
" entities.append(entity)\n",
"\n",
" # Initialize containers for processed and unprocessed entities\n",
" list_of_entities = []\n",
" list_of_entities_not_mapped = []\n",
" processed_entities = set() # Set to track processed entities\n",
"\n",
" # Iterate over rows and headers\n",
" for i in range(0, len(row), len(headers)):\n",
" row_slice = row[i : i + len(headers)]\n",
" for j in range(1, len(headers)):\n",
" type_ = headers[j]\n",
" mention_text = row_slice[j]\n",
" if \"/\" in type_:\n",
" continue # Skip if type_ contains '/'\n",
" # Check if entity matches parser_entities before using re.finditer\n",
" matched = False\n",
" for proc_ent in parser_entities:\n",
" if proc_ent.type_ == type_ and proc_ent.mention_text == mention_text:\n",
" matched = True\n",
" # Directly append the parser entity\n",
" list_of_entities.append(proc_ent)\n",
" break # Exit loop after match is found\n",
"\n",
" # If no match is found, proceed with finditer to create a new entity\n",
" if not matched and mention_text:\n",
" occurrences = re.finditer(\n",
" re.escape(str(mention_text)) + r\"[ |\\,|\\n]\", res_dict.text\n",
" )\n",
" for m in occurrences:\n",
" start, end = m.start(), m.end()\n",
" entity_id = (mention_text, start, end)\n",
" if entity_id not in processed_entities:\n",
" entity = create_entity(mention_text, type_, m)\n",
" try:\n",
" entity_modified = fix_page_anchor_entity(\n",
" entity, res_dict, token_range\n",
" )\n",
" processed_entities.add(entity_id)\n",
" list_of_entities.append(entity_modified)\n",
" except Exception as e:\n",
" print(\n",
" \"Not able to find \" + mention_text + \" in the OCR:\", e\n",
" )\n",
" continue\n",
"\n",
" # Update and write final output as in your existing code\n",
" res_dict.entities = list_of_entities\n",
" for entity in entities:\n",
" res_dict.entities.append(entity)\n",
"\n",
" # Write the final output to GCS\n",
" output_bucket_name = OUTPUT_BUCKET.split(\"/\")[2]\n",
" output_path_within_bucket = (\n",
" \"/\".join(OUTPUT_BUCKET.split(\"/\")[3:]) + file_name + \".json\"\n",
" )\n",
" utilities.store_document_as_json(\n",
" documentai.Document.to_json(res_dict),\n",
" output_bucket_name,\n",
" output_path_within_bucket,\n",
" )\n",
"print(\"Process Completed!!!\")"
]
},
{
"cell_type": "markdown",
"id": "a9eda18a",
"metadata": {},
"source": [
"# 4. Output Details\n",
"\n",
"As we can observe, data mentioned in csv is annotated in DocAI proto results\n",
"<img src='./images/output_sample.png' width=800 height=400></img>"
]
},
{
"cell_type": "markdown",
"id": "7347f175",
"metadata": {},
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "43567429-2662-4c1b-aaa6-0f0278a1bae0",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m112",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m112"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}