webhook/main.py (144 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from collections.abc import Iterator
from datetime import datetime
import functions_framework
from cloudevents.http import CloudEvent
from google import genai # type: ignore
from google.genai.types import GenerateContentConfig # type: ignore
from google.api_core.client_options import ClientOptions
from google.cloud import documentai
from google.cloud import bigquery
from google.cloud import storage # type: ignore
@functions_framework.cloud_event
def on_cloud_event(event: CloudEvent) -> None:
"""Process a new document from an Eventarc event.
Args:
event: CloudEvent object.
"""
try:
process_document(
event_id=event.data["id"],
input_bucket=event.data["bucket"],
filename=event.data["name"],
mime_type=event.data["contentType"],
time_uploaded=datetime.fromisoformat(event.data["timeCreated"]),
project=os.environ["PROJECT_ID"],
location=os.environ["LOCATION"],
docai_processor_id=os.environ["DOCAI_PROCESSOR"],
docai_location=os.environ.get("DOCAI_LOCATION", "us"),
output_bucket=os.environ["OUTPUT_BUCKET"],
bq_dataset=os.environ["BQ_DATASET"],
bq_table=os.environ["BQ_TABLE"],
)
except Exception as e:
logging.exception(e, stack_info=True)
def process_document(
event_id: str,
input_bucket: str,
filename: str,
mime_type: str,
time_uploaded: datetime,
project: str,
location: str,
docai_processor_id: str,
docai_location: str,
output_bucket: str,
bq_dataset: str,
bq_table: str,
):
"""Process a new document.
Args:
event_id: ID of the event.
input_bucket: Name of the input bucket.
filename: Name of the input file.
mime_type: MIME type of the input file.
time_uploaded: Time the input file was uploaded.
project: Google Cloud project ID.
location: Google Cloud location.
docai_processor_id: ID of the Document AI processor.
docai_location: Location of the Document AI processor.
output_bucket: Name of the output bucket.
bq_dataset: Name of the BigQuery dataset.
bq_table: Name of the BigQuery table.
"""
doc_path = f"gs://{input_bucket}/{filename}"
print(f"📖 {event_id}: Getting document text")
doc_text = "\n".join(
get_document_text(
doc_path,
mime_type,
docai_processor_id,
output_bucket,
docai_location,
)
)
print(f"📝 {event_id}: Summarizing document")
print(f" - Text length: {len(doc_text)} characters")
client = genai.Client(vertexai=True, project=project, location=location)
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=doc_text,
config=GenerateContentConfig(
system_instruction=[
"Give me a summary of the following text."
"Use simple language and give examples."
]
),
)
doc_summary = response.text
print(doc_summary)
print(f" - Summary length: {len(doc_summary)} characters")
print(f"🗃️ {event_id}: Writing document summary to BigQuery: {project}.{bq_dataset}.{bq_table}")
write_to_bigquery(
event_id=event_id,
time_uploaded=time_uploaded,
doc_path=doc_path,
doc_text=doc_text,
doc_summary=doc_summary,
project=project,
bq_dataset=bq_dataset,
bq_table=bq_table,
)
print(f"✅ {event_id}: Done!")
def get_document_text(
input_file: str,
mime_type: str,
processor_id: str,
temp_bucket: str,
docai_location: str = "us",
) -> Iterator[str]:
"""Perform Optical Character Recognition (OCR) with Document AI on a Cloud Storage file.
For more information, see:
https://cloud.google.com/document-ai/docs/process-documents-ocr
Args:
input_file: GCS URI of the document file.
mime_type: MIME type of the document file.
processor_id: ID of the Document AI processor.
temp_bucket: GCS bucket to store Document AI temporary files.
docai_location: Location of the Document AI processor.
Yields: The document text chunks.
"""
# You must set the `api_endpoint` if you use a location other than "us".
documentai_client = documentai.DocumentProcessorServiceClient(
client_options=ClientOptions(api_endpoint=f"{docai_location}-documentai.googleapis.com")
)
# We're using batch_process_documents instead of process_document because
# process_document has a quota limit of 15 pages per document, while
# batch_process_documents has a quota limit of 500 pages per request.
# https://cloud.google.com/document-ai/quotas#general_processors
operation = documentai_client.batch_process_documents(
request=documentai.BatchProcessRequest(
name=processor_id,
input_documents=documentai.BatchDocumentsInputConfig(
gcs_documents=documentai.GcsDocuments(
documents=[
documentai.GcsDocument(gcs_uri=input_file, mime_type=mime_type),
],
),
),
document_output_config=documentai.DocumentOutputConfig(
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=f"gs://{temp_bucket}/ocr/{input_file.split('gs://')[-1]}",
),
),
),
)
operation.result()
# Read the results of the Document AI operation from Cloud Storage.
storage_client = storage.Client()
metadata = documentai.BatchProcessMetadata(operation.metadata)
output_gcs_path = metadata.individual_process_statuses[0].output_gcs_destination
(output_bucket, output_prefix) = output_gcs_path.removeprefix("gs://").split("/", 1)
for blob in storage_client.list_blobs(output_bucket, prefix=output_prefix):
blob_contents = blob.download_as_bytes()
document = documentai.Document.from_json(blob_contents, ignore_unknown_fields=True)
yield document.text
def write_to_bigquery(
event_id: str,
time_uploaded: datetime,
doc_path: str,
doc_text: str,
doc_summary: str,
project: str,
bq_dataset: str,
bq_table: str,
) -> None:
"""Write the summary to BigQuery.
Args:
event_id: The Eventarc trigger event ID.
time_uploaded: Time the document was uploaded.
doc_path: Cloud Storage path to the document.
doc_text: Text extracted from the document.
doc_summary: Summary generated fro the document.
project: Google Cloud project ID.
bq_dataset: Name of the BigQuery dataset.
bq_table: Name of the BigQuery table.
"""
bq_client = bigquery.Client(project=project)
bq_client.insert_rows(
table=bq_client.get_table(f"{bq_dataset}.{bq_table}"),
rows=[
{
"event_id": event_id,
"time_uploaded": time_uploaded,
"time_processed": datetime.now(),
"document_path": doc_path,
"document_text": doc_text,
"document_summary": doc_summary,
},
],
)