community/pdf-annotator-python/main.py (133 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module defines a CLI that uses Document AI to annotate a PDF document"""
import argparse
import os
import sys
from typing import Optional
import google.auth
from google.cloud.documentai_v1beta3 import DocumentProcessorServiceClient
from google.cloud.documentai_v1beta3 import Processor
import pikepdf
DEFAULT_MULTI_REGION_LOCATION = "us"
DEFAULT_PROCESSOR_TYPE = "FORM_PARSER_PROCESSOR"
def main(args):
"""This functions annotates a PDF document using the Document AI API"""
if not args.project_id:
_, project_id = google.auth.default()
args.project_id = project_id
parent = f"projects/{args.project_id}/locations/{args.multi_region_location}"
client = DocumentProcessorServiceClient()
processor_id = find_processor_id_of_type(client, parent, args.form_processor_type)
if processor_id is None:
print(
f"no form processor found. "
f'creating new processor of type "{args.form_processor_type}"',
)
processor_id = create_processor(client, parent, args.form_processor_type)
if not os.path.isfile(os.path.abspath(args.input)):
print(f"could not find file at {os.path.abspath(args.input)}")
return 1
# If a output path is not specified, use input directory
if not args.output:
args.output = f'{os.path.abspath(args.input).rstrip(".pdf")}_annotated.pdf'
print("Calling Document AI API...", end="")
with open(args.input, "rb") as pdf_file:
document = client.process_document(
request={
"name": f"{parent}/processors/{processor_id}",
"raw_document": {
"content": pdf_file.read(),
"mime_type": "application/pdf",
},
}
).document
original_pdf = pikepdf.Pdf.open(os.path.abspath(args.input))
annotated_pdf = pikepdf.Pdf.new()
for page_num, page_info in enumerate(document.pages):
annotated_pdf.pages.append(original_pdf.pages[page_num])
print(
f"Found { len(page_info.form_fields)} form fields on page {page_num + 1}:"
)
# Calculate the max "x" and "y" coordinate values for the PDF
# this uses the PDF's own built in measuring units which need
# to be used to place annotations
page_max_x = float(annotated_pdf.pages[page_num].trimbox[2])
page_max_y = float(annotated_pdf.pages[page_num].trimbox[3])
page_annotations = []
for field in page_info.form_fields:
# Use the normalized vertices of the form fields and the max
# "x" and "y" coordinates to calculate the position of the
# annotation using the PDF's built in measuring units
coord1 = field.field_name.bounding_poly.normalized_vertices[0]
coord2 = field.field_name.bounding_poly.normalized_vertices[1]
rect = [
coord1.x * page_max_x,
page_max_y - coord1.y * page_max_y,
coord2.x * page_max_x,
page_max_y - coord2.y * page_max_y,
]
# Extract the parsed name and values of each field
# as determined by Document AI's API
name = layout_to_text(field.field_name, document.text)
value = layout_to_text(field.field_value, document.text)
annotation_text = f"{name}: {value}"
# Create a PDF annotation for this field name value pair
page_annotations.append(
pikepdf.Dictionary(
Type=pikepdf.Name.Annot,
Subtype=pikepdf.Name.Text,
Rect=rect,
Name=pikepdf.Name.Note,
Contents=pikepdf.String(annotation_text),
Open=False,
)
)
print(f"adding annotation: {annotation_text}")
# Add all the annotations for this page
annotated_pdf.pages[page_num].Annots = annotated_pdf.make_indirect(
pikepdf.Array(page_annotations)
)
print(f"Saving annotated PDF to {args.output}.")
annotated_pdf.save(
os.path.join(args.output),
min_version=original_pdf.pdf_version,
# Disable annotation modification
encryption=pikepdf.Encryption(
owner="", user="", allow=pikepdf.Permissions(modify_annotation=False)
),
)
print("Done.")
return 0
def layout_to_text(layout: dict, text: str) -> str:
"""
Document AI identifies form fields by their offsets in the entirety of the
document's text. This function converts offsets to a string.
"""
response = ""
# If a text segment spans several lines, it will
# be stored in different text segments.
for segment in layout.text_anchor.text_segments: # type: ignore
start_index = (
int(segment.start_index)
if segment in layout.text_anchor.text_segments # type: ignore
else 0
)
end_index = int(segment.end_index)
response += text[start_index:end_index]
# remove whitespace
response = "".join(response.split("\n"))
response = "".join(response.split(":"))
return response.strip()
def create_processor(
client: DocumentProcessorServiceClient, parent: str, processor_type: str
) -> str:
"""Create a processor for a given processor type."""
processor = client.create_processor(
parent=parent,
processor=Processor(display_name=processor_type, type_=processor_type),
)
return processor.name.split("/")[-1]
def find_processor_id_of_type(
client: DocumentProcessorServiceClient, parent: str, tartget_processor_type: str
) -> Optional[str]:
"""Searches for a processor ID for a given processor type."""
for processor in client.list_processors(parent=parent).processors:
if processor.type_ == tartget_processor_type:
return processor.name.split("/")[-1]
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Annotate a PDF document.")
parser.add_argument(
"-i", "--input", help="filepath of input PDF to annotate", required=True
)
parser.add_argument("--output", help="path to save annotated PDF")
parser.add_argument(
"--project-id", help="Project ID to use to call the Document AI API"
)
parser.add_argument(
"--multi-region-location",
help="multi-regional location for document storage and processing",
default=DEFAULT_MULTI_REGION_LOCATION,
)
parser.add_argument(
"--form-processor-type",
help='type of form processor e.g. "FORM_W9_PROCESSOR"',
default=DEFAULT_PROCESSOR_TYPE,
)
sys.exit(main(parser.parse_args()))