fraud-detection-python/cloud-functions/process-invoices/main.py (182 lines of code) (raw):
# mypy: disable-error-code="1"
"""
Sends Invoices to Document AI API
Saves Extracted Info to BigQuery
Sends addresses to Geocoding PubSub Topic.
"""
import json
import os
import re
from typing import Any, Dict, List
from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.cloud import bigquery
from google.cloud import documentai_v1 as documentai
from google.cloud import pubsub_v1
from google.cloud import storage
# Reading environment variables
gcs_output_uri_prefix = os.environ.get("GCS_OUTPUT_URI_PREFIX")
PROJECT_ID = os.environ.get("GCP_PROJECT")
LOCATION = os.environ.get("PARSER_LOCATION")
PROCESSOR_ID = os.environ.get("PROCESSOR_ID")
geocode_request_topicname = os.environ.get("GEOCODE_REQUEST_TOPICNAME")
timeout = int(os.environ.get("TIMEOUT"))
# An array of Future objects
# Every call to publish() returns an instance of Future
geocode_futures = []
# Setting variables
address_fields = [
"receiver_address",
"remit_to_address",
"ship_from_address",
"ship_to_address",
"supplier_address",
]
# GCS Variables
gcs_output_bucket = f"{PROJECT_ID}-output-invoices"
gcs_archive_bucket_name = f"{PROJECT_ID}-archived-invoices"
destination_uri = f"gs://{gcs_output_bucket}/{gcs_output_uri_prefix}/"
DATSET_NAME = "invoice_parser_results"
ENTITIES_TABLE_NAME = "doc_ai_extracted_entities"
client_options = ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")
docai_client = documentai.DocumentProcessorServiceClient(client_options=client_options)
storage_client = storage.Client()
bq_client = bigquery.Client()
pub_client = pubsub_v1.PublisherClient()
ACCEPTED_MIME_TYPES = set(
["application/pdf", "image/jpeg", "image/png", "image/tiff", "image/gif"]
)
def write_to_bq(dataset_name, table_name, entities_extracted_dict):
"""
Write Data to BigQuery
"""
dataset_ref = bq_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
row_to_insert = []
row_to_insert.append(entities_extracted_dict)
json_data = json.dumps(row_to_insert, sort_keys=False)
# Convert to a JSON Object
json_object = json.loads(json_data)
schema_update_options = [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
]
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config = bigquery.LoadJobConfig(
schema_update_options=schema_update_options,
source_format=source_format,
)
job = bq_client.load_table_from_json(json_object, table_ref, job_config=job_config)
print(job.result()) # Waits for table load to complete.
def extract_document_entities(document: documentai.Document) -> dict:
"""
Get all entities from a document and output as a dictionary
Flattens nested entities/properties
Format: entity.type_: entity.mention_text OR entity.normalized_value.text
"""
document_entities: Dict[str, Any] = {}
def extract_document_entity(entity: documentai.Document.Entity):
"""
Extract Single Entity and Add to Entity Dictionary
"""
entity_key = entity.type_.replace("/", "_")
normalized_value = getattr(entity, "normalized_value", None)
new_entity_value = (
normalized_value.text if normalized_value else entity.mention_text
)
existing_entity = document_entities.get(entity_key)
# For entities that can have multiple (e.g. line_item)
if existing_entity:
# Change Entity Type to a List
if not isinstance(existing_entity, list):
existing_entity = list([existing_entity])
existing_entity.append(new_entity_value)
document_entities[entity_key] = existing_entity
else:
document_entities.update({entity_key: new_entity_value})
for entity in document.entities:
# Fields detected. For a full list of fields for each processor see
# the processor documentation:
# https://cloud.google.com/document-ai/docs/processors-list
extract_document_entity(entity)
# Properties are Sub-Entities
for prop in entity.properties:
extract_document_entity(prop)
return document_entities
def _batch_process_documents(
project_id: str,
location: str,
processor_id: str,
gcs_input_uri: str,
gcs_output_uri: str,
) -> Operation:
"""
Constructs a request to process a document using the Document AI
Batch Method.
"""
# The full resource name of the processor, e.g.:
# projects/project-id/locations/location/processor/processor-id
# You must create new processors in the Cloud Console first
resource_name = docai_client.processor_path(project_id, location, processor_id)
# Load GCS Input URI Prefix into Input Config Object
input_config = documentai.BatchDocumentsInputConfig(
gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri)
)
# Cloud Storage URI for Output directory
gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=gcs_output_uri
)
# Load GCS Output URI into Output Config Object
output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)
# Configure Process Request
request = documentai.BatchProcessRequest(
name=resource_name,
input_documents=input_config,
document_output_config=output_config,
)
# Future for long-running operations returned from Google Cloud APIs.
operation = docai_client.batch_process_documents(request)
return operation
def get_document_protos_from_gcs(
output_bucket: str, output_directory: str
) -> List[documentai.Document]:
"""
Download document proto output from GCS. (Directory)
"""
# List of all of the files in the directory
# `gs://gcs_output_uri/operation_id`
blob_list = list(storage_client.list_blobs(output_bucket, prefix=output_directory))
document_protos = []
for blob in blob_list:
# Document AI should only output JSON files to GCS
if ".json" in blob.name:
print("Fetching from " + blob.name)
document_proto = documentai.types.Document.from_json(
blob.download_as_bytes()
)
document_protos.append(document_proto)
else:
print(f"Skipping non-supported file type {blob.name}")
return document_protos
def cleanup_gcs(
input_bucket: str,
input_filename: str,
output_bucket: str,
output_directory: str,
archive_bucket: str,
):
"""
Deleting the intermediate files created by the Doc AI Parser
Moving Input Files to Archive
"""
# Intermediate document.json files
blob_list = list(storage_client.list_blobs(output_bucket, prefix=output_directory))
for blob in blob_list:
blob.delete()
# Copy input file to archive bucket
source_bucket = storage_client.bucket(input_bucket)
source_blob = source_bucket.blob(input_filename)
destination_bucket = storage_client.bucket(archive_bucket)
source_bucket.copy_blob(source_blob, destination_bucket, input_filename)
# delete from the input folder
source_blob.delete()
def process_address(address_type: str, address_value: str, input_filename: str):
"""
Creating and publishing a message via Pub Sub
"""
message = {
"entity_type": address_type,
"entity_text": address_value,
"input_file_name": input_filename,
}
message_data = json.dumps(message).encode("utf-8")
geocode_topic_path = pub_client.topic_path(PROJECT_ID, geocode_request_topicname)
geocode_future = pub_client.publish(geocode_topic_path, data=message_data)
geocode_futures.append(geocode_future)
# pylint: disable=unused-argument
def process_invoice(event, context):
"""
Extract Invoice Entities and Save to BQ
"""
input_bucket = event.get("bucket")
input_filename = event.get("name")
mime_type = event.get("contentType")
if not input_bucket or not input_filename:
print("No bucket or filename provided")
return
if mime_type not in ACCEPTED_MIME_TYPES:
print("Cannot parse the file type: " + mime_type)
return
print("Mime Type: " + mime_type)
gcs_input_uri = f"gs://{input_bucket}/{input_filename}"
print("Input File: " + gcs_input_uri)
operation = _batch_process_documents(
PROJECT_ID, LOCATION, PROCESSOR_ID, gcs_input_uri, destination_uri
)
print("Document Processing Operation: " + operation.operation.name)
# Wait for the operation to finish
operation.result(timeout=timeout)
# Output files will be in a new subdirectory with Operation ID as the name
operation_id = re.search(
r"operations\/(\d+)", operation.operation.name, re.IGNORECASE
).group(1)
output_directory = f"{gcs_output_uri_prefix}/{operation_id}"
print(f"Output Path: gs://{gcs_output_bucket}/{output_directory}")
print("Output files:")
output_document_protos = get_document_protos_from_gcs(
gcs_output_bucket, output_directory
)
# Reading all entities into a dictionary to write into a BQ table
for document_proto in output_document_protos:
entities = extract_document_entities(document_proto)
entities["input_file_name"] = input_filename
print("Entities:", entities)
print("Writing DocAI Entities to BQ")
# Add Entities to DocAI Extracted Entities Table
write_to_bq(DATSET_NAME, ENTITIES_TABLE_NAME, entities)
# Send Address Data to PubSub
for address_field in address_fields:
if address_field in entities:
process_address(address_field, entities[address_field], input_filename)
cleanup_gcs(
input_bucket,
input_filename,
gcs_output_bucket,
output_directory,
gcs_archive_bucket_name,
)
return