webhook/document_extract.py (48 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import re from google.cloud import vision from google.cloud import storage def async_document_extract( bucket: str, name: str, output_bucket: str, timeout: int = 420, ) -> str: """Perform OCR with PDF/TIFF as source files on GCS. Original sample is here: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/vision/snippets/detect/detect.py#L806 Note: This function can cause the IOPub data rate to be exceeded on a Jupyter server. This rate can be changed by setting the variable `--ServerApp.iopub_data_rate_limit Args: bucket (str): GCS URI of the bucket containing the PDF/TIFF files. name (str): name of the PDF/TIFF file. output_bucket: bucket to store output in timeout (int): Timeout in seconds for the request. Returns: str: the complete text """ gcs_source_uri = f"gs://{bucket}/{name}" prefix = "ocr" gcs_destination_uri = f"gs://{output_bucket}/{prefix}/" mime_type = "application/pdf" batch_size = 2 # Perform Vision OCR client = vision.ImageAnnotatorClient() feature = vision.Feature(type_=vision.Feature.Type.DOCUMENT_TEXT_DETECTION) gcs_source = vision.GcsSource(uri=gcs_source_uri) input_config = vision.InputConfig(gcs_source=gcs_source, mime_type=mime_type) gcs_destination = vision.GcsDestination(uri=gcs_destination_uri) output_config = vision.OutputConfig( gcs_destination=gcs_destination, batch_size=batch_size ) async_request = vision.AsyncAnnotateFileRequest( features=[feature], input_config=input_config, output_config=output_config ) operation = client.async_batch_annotate_files(requests=[async_request]) print("OCR: waiting for the operation to finish.") operation.result(timeout=timeout) # Once the request has completed and the output has been # written to GCS, we can list all the output files. return get_ocr_output_from_bucket(gcs_destination_uri, output_bucket) def get_ocr_output_from_bucket(gcs_destination_uri: str, bucket_name: str) -> str: """Iterates over blobs in output bucket to get full OCR result. Arguments: gcs_destination_uri: the URI where the OCR output was saved. bucket_name: the name of the bucket where the output was saved. Returns: The full text of the document """ storage_client = storage.Client() match = re.match(r"gs://([^/]+)/(.+)", gcs_destination_uri) prefix = match.group(2) bucket = storage_client.get_bucket(bucket_name) # List objects with the given prefix, filtering out folders. blob_list = [ blob for blob in list(bucket.list_blobs(prefix=prefix)) if not blob.name.endswith("/") ] # Concatenate all text from the blobs complete_text = "" for output in blob_list: json_string = output.download_as_bytes().decode("utf-8") response = json.loads(json_string) # The actual response for the first page of the input file. page_response = response["responses"][0] annotation = page_response["fullTextAnnotation"] complete_text = complete_text + annotation["text"] return complete_text