document-ai/code/main.py (122 lines of code) (raw):

from google.cloud import bigquery, documentai_v1beta3, storage import functions_framework from google.cloud import documentai import os import json def process_document(bucket_name, object_name): """Process a document stored in GCS.""" print("Document processing started.") client = documentai_v1beta3.DocumentProcessorServiceClient() # Download file file_path = "/tmp/{}".format(object_name) print("download document to..."+file_path) storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) blob = bucket.blob(object_name) blob.download_to_filename(file_path) # Read the file into memory with open(file_path, "rb") as image: image_content = image.read() # Set the document content in the request document = {"content": image_content, "mime_type": blob.content_type} # Configure the process request processor_name = os.getenv("FORM_PARSER_PROCESSOR") if not processor_name: print("Environment variable FORM_PARSER_PROCESSOR not set") return request = {"name": processor_name, "document": document} # Use the Document AI client to process the request result = client.process_document(request=request) document = result.document document_text = document.text # Extract key value pairs document_pages = document.pages document_dict = {} for page in document_pages: for form_field in page.form_fields: fieldName = get_text(form_field.field_name, document) fieldValue = get_text(form_field.field_value, document) document_dict[f"{fieldName}"] = fieldValue # Extract Summary # Set the document content in the request document = {"content": image_content, "mime_type": blob.content_type} print("Summarizing Document") summary_processor_name = os.getenv("SUMMARY_PROCESSOR") if not summary_processor_name: print("Environment variable SUMMARY_PROCESSOR not set") return summary_request = {"name": summary_processor_name, "document": document} summary_result = client.process_document(request=summary_request) document = summary_result.document summary_text = document.entities[0].mention_text print("Document processing complete.") process_output(bucket_name, object_name, document_text, summary_text, document_dict) def get_text(doc_element: dict, document: dict): """ Document AI identifies form fields by their offsets in document text. This function converts offsets to text snippets. """ response = "" # If a text segment spans several lines, it will # be stored in different text segments. for segment in doc_element.text_anchor.text_segments: start_index = ( int(segment.start_index) if segment in doc_element.text_anchor.text_segments else 0 ) end_index = int(segment.end_index) response += document.text[start_index:end_index] return response def process_output(bucket_name, object_name, document_text, summary_text, document_dict): """Moves a blob from one bucket to another.""" print("Process output started.") storage_client = storage.Client() destination_bucket_name = os.environ['GCS_OUTPUT'] destination_bucket = storage_client.bucket(destination_bucket_name) # Save results print("Saving raw results into the output bucket...") results_text_name = "{}.text".format(object_name) results_text_blob = destination_bucket.blob(results_text_name) results_text_blob.upload_from_string(document_text) print("Saving summary results into the output bucket...") results_summary_name = "{}.summary".format(object_name) results_summary_blob = destination_bucket.blob(results_summary_name) results_summary_blob.upload_from_string(summary_text) print("Saving json results into the output bucket...") results_json = { "document_file_name": object_name, "document_content": document_dict, "document_summary": summary_text } results_json = json.dumps(results_json) results_json_name = "{}.json".format(object_name) results_json_blob = destination_bucket.blob(results_json_name) results_json_blob.upload_from_string(results_json) # Move object from input to output bucket print("Moving object {} from {} to {}".format(object_name, bucket_name, destination_bucket_name)) source_bucket = storage_client.bucket(bucket_name) source_blob = source_bucket.blob(object_name) blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, object_name) source_bucket.delete_blob(object_name) # Persist results into BigQuery print("Persisting data to BigQuery...") bq_client = bigquery.Client() table_id = os.getenv("BQ_TABLE_ID") job_config = bigquery.LoadJobConfig( schema=[ bigquery.SchemaField("document_file_name", "STRING"), bigquery.SchemaField("document_content", "JSON"), bigquery.SchemaField("document_summary", "STRING"), ], source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, ) uri = "gs://{}/{}".format(destination_bucket_name, results_json_name) print("Load file {} into BigQuery".format(uri)) load_job = bq_client.load_table_from_uri( uri, table_id, location=os.getenv("BQ_LOCATION"), # Must match the destination dataset location. job_config=job_config, ) load_job.result() print("Process output completed.") # Triggered by a change in a storage bucket @functions_framework.cloud_event def trigger_gcs(cloud_event): data = cloud_event.data event_id = cloud_event["id"] event_type = cloud_event["type"] bucket = data["bucket"] name = data["name"] metageneration = data["metageneration"] timeCreated = data["timeCreated"] updated = data["updated"] print(f"Event ID: {event_id}") print(f"Event type: {event_type}") print(f"Bucket: {bucket}") print(f"File: {name}") print(f"Metageneration: {metageneration}") print(f"Created: {timeCreated}") print(f"Updated: {updated}") process_document(bucket, name)