community/expense-parser-python/cloud-functions/main.py (106 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from google.cloud import bigquery
from google.cloud import documentai_v1 as documentai
from google.cloud import storage
# Read environment variables
project_id = os.environ.get("GCP_PROJECT")
location = os.environ.get("PARSER_LOCATION")
processor_id = os.environ.get("PROCESSOR_ID")
dataset_name = os.environ.get("BQ_DATASET_NAME")
table_name = os.environ.get("BQ_TABLE_NAME")
# Set variables
gcs_output_uri = f"gs://{project_id}-output-receipts"
gcs_archive_bucket_name = f"{project_id}-archived-receipts"
gcs_rejected_bucket_name = f"{project_id}-rejected-files"
processor = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
accepted_file_types = [
"application/pdf",
"image/jpg",
"image/png",
"image/gif",
"image/tiff",
"image/jpeg",
"image/tif",
"image/webp",
"image/bmp",
]
# Create GCP clients
docai_client = documentai.DocumentProcessorServiceClient()
storage_client = storage.Client()
bq_client = bigquery.Client()
# entry point for Cloud Functions
def process_receipt(event, context):
input_bucket_name = event["bucket"]
file_name = event["name"]
content_type = event["contentType"]
if content_type in accepted_file_types:
file = get_gcs_file(file_name, input_bucket_name)
extracted_doc = extract_entities(file, content_type)
extracted_list = format_entities(extracted_doc)
write_to_bq(dataset_name, table_name, extracted_list, file_name)
# Copy input file to archive bucket
copy_blob(input_bucket_name, gcs_archive_bucket_name, file_name)
else:
print("Cannot parse the file type.")
# move file to designated bucket if file type is not supported
copy_blob(input_bucket_name, gcs_rejected_bucket_name, file_name)
return
# write data to a BigQuery table
def write_to_bq(dataset_name, table_name, extracted_list, file_name):
dataset_ref = bq_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
for item in extracted_list:
item["doc_name"] = file_name
json_data = json.dumps(extracted_list, sort_keys=False)
# Convert to a JSON Object
json_object = json.loads(json_data)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
ignore_unknown_values=True,
schema=[
bigquery.SchemaField("doc_name", "STRING"),
bigquery.SchemaField("confidence", "FLOAT"),
bigquery.SchemaField("type_", "STRING"),
bigquery.SchemaField("mention_text", "STRING"),
],
)
job = bq_client.load_table_from_json(json_object, table_ref, job_config=job_config)
job.result() # Waits for table load to complete.
def get_gcs_file(file_name, bucket_name):
bucket = storage_client.get_bucket(bucket_name)
gcs_file = bucket.get_blob(file_name)
file_blob = gcs_file.download_as_bytes()
return file_blob
def extract_entities(file, content_type):
document = {"content": file, "mime_type": content_type}
request = {"name": processor, "raw_document": document, "skip_human_review": True}
results = docai_client.process_document(request)
print("Entities extracted from DocAI.")
return results.document
# Format all entities into a list to write into a BQ table
def format_entities(extracted_doc):
result_ents = []
for entity in extracted_doc.entities:
entity_type = str(entity.type_)
if entity_type == "line_item":
for property in entity.properties:
ents = {
"type_": property.type_,
"mention_text": property.mention_text,
"confidence": property.confidence,
}
result_ents.append(ents)
ents = {
"type_": entity.type_,
"mention_text": entity.mention_text,
"confidence": entity.confidence,
}
result_ents.append(ents)
else:
ents = {
"type_": entity_type,
"mention_text": entity.mention_text,
"confidence": entity.confidence,
}
result_ents.append(ents)
print("Formatted entities to a list.")
return result_ents
# copy a blob from one bucket to another by name
def copy_blob(source_bucket_name, dest_bucket_name, blob_name):
source_bucket = storage_client.get_bucket(source_bucket_name)
dest_bucket = storage_client.get_bucket(dest_bucket_name)
blob = source_bucket.get_blob(blob_name)
source_bucket.copy_blob(blob, dest_bucket, blob_name)
blob.delete()
return