incubator-tools/best-practices/utilities/utilities.py (439 lines of code) (raw):

"""This script offers a suite of utility functions designed to simplify and streamline operations associated with Google Cloud Storage (GCS) and Google Cloud's DocumentAI.""" # Import the libraries import difflib import io import json import operator from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple, Union from google.api_core.client_options import ClientOptions from google.cloud import documentai_v1beta3 as documentai from google.cloud import storage from google.cloud.exceptions import Conflict from google.cloud.exceptions import NotFound from pandas import DataFrame import pandas as pd from PIL import Image pd.options.mode.chained_assignment = None # default='warn' def file_names(gs_file_path: str) -> Tuple[List[str], Dict[str, str]]: """ Retrieves the list of files from a given Google Cloud Storage path. Args: gs_file_path (str): The Google Cloud Storage path in the format "gs://<bucket-name>/<folder-path>/". Returns: tuple: A tuple containing two elements: 1. List of filenames present in the specified path. 2. Dictionary with filenames as keys and their respective full paths in the bucket as values. """ bucket = gs_file_path.split("/")[2] file_names_list = [] file_dict = {} storage_client = storage.Client() source_bucket = storage_client.get_bucket(bucket) filenames = [ filename.name for filename in list( source_bucket.list_blobs(prefix=(("/").join(gs_file_path.split("/")[3:]))) ) ] for i, _ in enumerate(filenames): x = filenames[i].split("/")[-1] if x: file_names_list.append(x) file_dict[x] = filenames[i] return file_names_list, file_dict def check_create_bucket(bucket_name: str) -> storage.bucket.Bucket: """ Checks if a specified Google Cloud Storage bucket exists. If not, it creates one. Primarily used for creating a temporary bucket to store processed files. Args: bucket_name (str): The name of the bucket to check or create. Returns: google.cloud.storage.bucket.Bucket: The bucket object corresponding to the provided bucket name. """ storage_client = storage.Client() try: bucket = storage_client.get_bucket(bucket_name) print(f"Bucket {bucket_name} already exists.") except NotFound: bucket = storage_client.create_bucket(bucket_name) print(f"Bucket {bucket_name} created.") return bucket def bucket_delete(bucket_name: str) -> None: """ Deletes a specified Google Cloud Storage bucket. Primarily used for deleting temporary buckets after their purpose is served. Args: bucket_name (str): The name of the bucket to be deleted. Returns: None. If the bucket exists, it will be deleted. If it doesn't exist or an error occurs, the function will silently pass. """ print("Deleting bucket:", bucket_name) storage_client = storage.Client() try: bucket = storage_client.get_bucket(bucket_name) bucket.delete(force=True) except (NotFound, Conflict): pass def list_blobs(bucket_name: str) -> List[str]: """ Retrieves a list of filenames (blobs) from a specified Google Cloud Storage bucket. Args: bucket_name (str): The name of the bucket from which to retrieve the list of filenames. Returns: list: A list containing the names of all files (blobs) present in the specified bucket. """ blob_list = [] storage_client = storage.Client() blobs = storage_client.list_blobs(bucket_name) for blob in blobs: blob_list.append(blob.name) return blob_list def matching_files_two_buckets( bucket_1: str, bucket_2: str ) -> Tuple[Dict[str, str], Dict[str, str]]: """ Compares the files from two Google Cloud Storage buckets to find files with similar names. Args: bucket_1 (str): Name of the first GCS bucket. bucket_2 (str): Name of the second GCS bucket. Returns: tuple: A tuple containing two dictionaries: 1. matched_files_dict: Dictionary with filenames from bucket_1 as keys and corresponding similar filenames from bucket_2 as values. 2. non_matched_files_dict: Dictionary with filenames from bucket_1 as keys and a message indicating no similar file was found in bucket_2 as values. """ bucket_1_blobs = list_blobs(bucket_1) bucket_2_blobs = list_blobs(bucket_2) matched_files_dict = {} non_matched_files_dict = {} for i in bucket_1_blobs: for j in bucket_2_blobs: matched_score = difflib.SequenceMatcher(None, i, j).ratio() print("matched_score:", matched_score) if matched_score >= 0.8: matched_files_dict[i] = j else: non_matched_files_dict[i] = "No parsed output available" for i in matched_files_dict: if i in non_matched_files_dict: del non_matched_files_dict[i] print("matched_files_dict =", matched_files_dict) print("non_matched_files_dict =", non_matched_files_dict) return matched_files_dict, non_matched_files_dict def documentai_json_proto_downloader( bucket_name: str, blob_name_with_prefix_path: str ) -> documentai.Document: """ Downloads a file from a specified Google Cloud Storage bucket and converts it into a DocumentAI Document proto. Args: bucket_name (str): The name of the GCS bucket from which to download the file. blob_name_with_prefix_path (str): The full path (prefix) to the JSON blob in the bucket. Returns: documentai.Document: A DocumentAI Document proto representation of the downloaded JSON. """ storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name_with_prefix_path) doc = documentai.Document.from_json(blob.download_as_bytes()) return doc def copy_blob( bucket_name: str, blob_name: str, destination_bucket_name: str, destination_blob_name: str, ) -> None: """ Copies a blob (file/object) from one GCP storage bucket to another. Args: bucket_name (str): Name of the source bucket. blob_name (str): Name of the blob (file/object) in the source bucket to be copied. destination_bucket_name (str): Name of the destination bucket. destination_blob_name (str): Desired name for the blob in the destination bucket. Output: None. The blob is copied to the destination bucket with the specified name. """ storage_client = storage.Client() source_bucket = storage_client.bucket(bucket_name) source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name) def get_entity_metadata( df: pd.DataFrame, entity: documentai.Document.Entity ) -> pd.DataFrame: """ Parse the entity to extract bounding boxes, entity type, text and page number. Args: entity: Document AI entity object Returns: pandas.DataFrame: A DataFrame representation of the JSON with columns ['type_', 'mention_text', 'bbox', 'page']. 'type_' column indicates the type of entity. 'mention_text' column contains the text of the entity or its property. 'bbox' column contains bounding box coordinates. 'page' column indicates the page number where the entity is found. """ bbox = [] bound_poly = entity.page_anchor.page_refs coordinates_xy = bound_poly[0].bounding_poly.normalized_vertices x1 = [xy.x for xy in coordinates_xy] y1 = [xy.y for xy in coordinates_xy] try: page = entity.page_anchor.page_refs[0].page except IndexError: page = 0 bbox = [ round(min(x1), 8), round(min(y1), 8), round(max(x1), 8), round(max(y1), 8), ] df.loc[len(df.index)] = [ entity.type_, entity.mention_text, bbox, page, ] return df def json_to_dataframe(data: documentai.Document) -> pd.DataFrame: """ Converts a loaded DocumentAI proto JSON into a pandas DataFrame. Args: data (json object): A loaded DocumentAI Document proto JSON. Returns: pandas.DataFrame: A DataFrame representation of the JSON with columns ['type_', 'mention_text', 'bbox', 'page']. 'type_' column indicates the type of entity. 'mention_text' column contains the text of the entity or its property. 'bbox' column contains bounding box coordinates. 'page' column indicates the page number where the entity is found. """ df = pd.DataFrame(columns=["type_", "mention_text", "bbox", "page"]) try: for entity in data.entities: # First, we'll assume it doesn't have properties has_properties = False # Check for subentities (properties) try: for subentity in entity.properties: has_properties = True # Mark that we found properties try: df = get_entity_metadata(df, subentity) except (AttributeError, Exception) as e: print(e) continue except (AttributeError, Exception) as e: print(f"Exception encountered: {e}") continue # If no properties were found for the entity, add it to the dataframe if not has_properties: try: df = get_entity_metadata(df, entity) except (AttributeError, Exception) as e: print(f"Exception encountered: {e}") continue return df except (AttributeError, Exception) as e: print(f"Exception encountered: {e}") return df def blob_downloader(bucket_name: str, blob_name: str) -> Dict: """ Downloads a JSON file from a specified Google Cloud Storage bucket and loads it as a Python dictionary. Args: bucket_name (str): The name of the GCS bucket from which to download the file. blob_name (str): The name or full path to the JSON blob in the bucket. Returns: dict: A dictionary representation of the downloaded JSON file. """ storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) contents = blob.download_as_string() return json.loads(contents.decode()) def bbox_maker(bounding_poly: List[Dict[str, float]]) -> List[float]: """ Converts a bounding polygon (list of coordinates) into a bounding box represented by the minimum and maximum x and y values. Args: bounding_poly (list of dicts): A list of coordinates where each coordinate is a dictionary with "x" and "y" keys. Example: [{"x": 0.5, "y": 0.5}, ...] Returns: list: A list representing the bounding box in the format [min_x, min_y, max_x, max_y]. """ x_list = [] y_list = [] for i in bounding_poly: x_list.append(i["x"]) y_list.append(i["y"]) bbox = [min(x_list), min(y_list), max(x_list), max(y_list)] return bbox def remove_row(df: pd.DataFrame, entity: Any) -> pd.DataFrame: """ Removes rows from a DataFrame where the "type_" column matches the specified entity. Args: df (pandas.DataFrame): The input DataFrame from which rows need to be removed. entity (str): The entity value that should be used to identify rows to be removed. Returns: pandas.DataFrame: A DataFrame with rows removed where the "type_" column matches the specified entity. """ return df[df["type_"] != entity] def find_match( entity_file1: List[Union[str, List[float]]], df_file2: pd.DataFrame ) -> Optional[int]: """ Identifies a matching entity from a DataFrame (`df_file2`) based on the Intersection Over Union (IOU) of bounding boxes with respect to a given entity (`entity_file1`). Args: entity_file1 (list): A list containing entity details from the first file. It should have the format [type_, mention_text, bbox, ...]. df_file2 (pandas.DataFrame): The input DataFrame containing entity details from the second file. Returns: int or None: The index of the matching entity from `df_file2` if found, otherwise None. Note: The function assumes the existence of a function `bb_intersection_over_union` that computes the IOU. """ bbox_file1 = entity_file1[2] # Entity not present in json file if not bbox_file1: return None # Filtering entities with the same name df_file2 = df_file2[df_file2["type_"] == entity_file1[0]] # Calculating IOU values for the entities index_iou_pairs = [] for index, entity_file2 in df_file2.iterrows(): if entity_file2["bbox"]: iou = bb_intersection_over_union(bbox_file1, entity_file2["bbox"]) index_iou_pairs.append((index, iou)) # Choose entity with highest IOU, IOU should be at least > 0.2 matched_index = None for index_iou in sorted(index_iou_pairs, key=operator.itemgetter(1), reverse=True): if index_iou[1] > 0.2: # Threshold matched_index = index_iou[0] break return matched_index def bb_intersection_over_union(box1: Any, box2: List[float]) -> float: """ Calculates the Intersection Over Union (IOU) between two bounding boxes. The bounding boxes are represented as a list of coordinates: [x_min, y_min, x_max, y_max]. Args: box1 (list[float]): Coordinates of the first bounding box. box2 (list[float]): Coordinates of the second bounding box. Returns: float: The IOU between the two bounding boxes. A value between 0 (no overlap) to 1 (complete overlap). Example: box1 = [0.1, 0.1, 0.6, 0.6] box2 = [0.5, 0.5, 1.0, 1.0] iou = bb_intersection_over_union(box1, box2) """ # Determine the coordinates of the intersection rectangle x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) # Calculate the area of the intersection rectangle inter_area = max(0, x2 - x1) * max(0, y2 - y1) # If there's no intersection, IOU is 0 if inter_area == 0: return 0 # Calculate the area of each bounding box box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) # Calculate the IOU iou = inter_area / float(box1_area + box2_area - inter_area) return iou def get_match_ratio(values: List[str]) -> float: """ Calculates the similarity ratio between two strings using SequenceMatcher. Args: values (list[str]): A list containing four elements. The second(index 1) and fourth(index 3) are the strings to be compared. Returns: float: A ratio representing the similarity between the two strings. A value between 0 (no similarity) and 1 (identical strings). Example: values = ["Name1", "apple", "Name2", "apples"] ratio = get_match_ratio(values) """ file1_value = values[1] file2_value = values[2] if file1_value == "Entity not found." or file2_value == "Entity not found.": return 0 return difflib.SequenceMatcher(a=file1_value, b=file2_value).ratio() def compare_pre_hitl_and_post_hitl_output( file1: Any, file2: Any ) -> Tuple[DataFrame, float]: """ Compares the entities between two files and returns the results in a DataFrame. Args: file1 (Any): DocumentAI Object of Json from the first file. file2 (Any): DocumentAI Object of Json from the second file. Returns: Tuple[DataFrame, float]: A tuple where the first element is a DataFrame based on the comparison,and the second element is a float representing the score. """ df_file1 = json_to_dataframe(file1) df_file2 = json_to_dataframe(file2) file1_entities = [entity[0] for entity in df_file1.values] file2_entities = [entity[0] for entity in df_file2.values] # find entities which are present only once in both files # these entities will be matched directly common_entities = set(file1_entities).intersection(set(file2_entities)) exclude_entities = [] for entity in common_entities: if file1_entities.count(entity) > 1 or file2_entities.count(entity) > 1: exclude_entities.append(entity) for entity in exclude_entities: common_entities.remove(entity) df_compare = pd.DataFrame( columns=[ "Entity Type", "Pre_HITL_Output", "Post_HITL_Output", "pre_bbox", "post_bbox", "page1", "page2", ] ) for entity in common_entities: value1 = df_file1[df_file1["type_"] == entity].iloc[0]["mention_text"] value2 = df_file2[df_file2["type_"] == entity].iloc[0]["mention_text"] pre_bbox = df_file1[df_file1["type_"] == entity].iloc[0]["bbox"] post_bbox = df_file2[df_file2["type_"] == entity].iloc[0]["bbox"] page1 = df_file1[df_file1["type_"] == entity].iloc[0]["page"] page2 = df_file2[df_file2["type_"] == entity].iloc[0]["page"] df_compare.loc[len(df_compare.index)] = [ entity, value1, value2, pre_bbox, post_bbox, page1, page2, ] # common entities are removed from df_file1 and df_file2 df_file1 = remove_row(df_file1, entity) df_file2 = remove_row(df_file2, entity) # remaining entities are matched comparing the area of IOU across them mention_text2 = pd.Series(dtype=str) bbox2 = pd.Series(dtype=object) bbox1 = pd.Series(dtype=object) page_1 = pd.Series(dtype=object) page_2 = pd.Series(dtype=object) for index, row in enumerate(df_file1.values): matched_index = find_match(row, df_file2) if matched_index is not None: mention_text2.loc[index] = df_file2.loc[matched_index][1] bbox2.loc[index] = df_file2.loc[matched_index][2] bbox1.loc[index] = row[2] page_2.loc[index] = df_file2.loc[matched_index][3] page_1.loc[index] = row[3] df_file2 = df_file2.drop(matched_index) else: mention_text2.loc[index] = "Entity not found." bbox2.loc[index] = "Entity not found." bbox1.loc[index] = row[2] page_1.loc[index] = row[3] page_2.loc[index] = "no" df_file1["mention_text2"] = mention_text2.values df_file1["bbox2"] = bbox2.values df_file1["bbox1"] = bbox1.values df_file1["page_1"] = page_1.values df_file1["page_2"] = page_2.values df_file1 = df_file1.drop(["bbox"], axis=1) df_file1 = df_file1.drop(["page"], axis=1) df_file1.rename( columns={ "type_": "Entity Type", "mention_text": "Pre_HITL_Output", "mention_text2": "Post_HITL_Output", "bbox1": "pre_bbox", "bbox2": "post_bbox", "page_1": "page1", "page_2": "page2", }, inplace=True, ) df_compare = df_compare.append(df_file1, ignore_index=True) # adding entities which are present in file2 but not in file1 for row in df_file2.values: df_compare.loc[len(df_compare.index)] = [ row[0], "Entity not found.", row[1], "[]", row[2], "[]", row[3], ] # df_compare['Match'] = df_compare['Ground Truth Text'] == df_compare['Output Text'] match_array = [] for i in range(0, len(df_compare)): match_string = "" not_found_string = "Entity not found." pre_output = df_compare.iloc[i]["Pre_HITL_Output"] post_output = df_compare.iloc[i]["Post_HITL_Output"] if pre_output == not_found_string and post_output == not_found_string: match_string = "TN" elif pre_output != not_found_string and post_output == not_found_string: match_string = "FN" elif pre_output == not_found_string and post_output != not_found_string: match_string = "FP" elif pre_output != not_found_string and post_output != not_found_string: match_string = "TP" if pre_output == post_output else "FP" else: match_string = "Something went Wrong." match_array.append(match_string) df_compare["Match"] = match_array df_compare["Fuzzy Ratio"] = df_compare.apply(get_match_ratio, axis=1) if list(df_compare.index): score = df_compare["Fuzzy Ratio"].sum() / len(df_compare.index) else: score = 0 # display(df_compare) return df_compare, score def get_document_schema( location: str, project_number: str, processor_id: str, processor_version_id: str ) -> Any: """ Fetches the document schema of a specific processor version from Google Cloud DocumentAI. Args: location (str): The location of the DocumentAI service ("eu" or other values). project_number (str): The number representing the Google Cloud project. processor_id (str): The ID of the DocumentAI processor. processor_version_id (str): The ID of the processor version. Returns: documentai.types.Document.Schema: The document schema for the specified processor version. Example: schema = get_document_schema("eu", "123456", "processor123", "version123") """ # Choose the endpoint based on the provided location. # You must set the `api_endpoint` if you use a location other than "us". opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") # Initialize the DocumentAI client. client = documentai.DocumentProcessorServiceClient(client_options=opts) # The full resource name of the processor version # e.g.: projects/project_num/locations/location/processors/ # processor_id/processorVersions/processor_version_id name = client.processor_version_path( project_number, location, processor_id, processor_version_id ) # Fetch the processor version details. response = client.get_processor_version(name=name) # Extract and return the document schema. return response.document_schema def create_pdf_bytes_from_json(gt_json: dict) -> Tuple[bytes, List[Any]]: """ Create PDF bytes from the image content of the ground truth JSON, which will be used for the processing of files. Args: gt_json (dict): The input JSON data containing image content. Returns: bytes: The output PDF in byte format. """ def decode_image(image_bytes: bytes) -> Image.Image: with io.BytesIO(image_bytes) as image_file: image = Image.open(image_file) image.load() return image def create_pdf_from_images( images: Sequence[Image.Image], ) -> bytes: """Creates a PDF from a sequence of images. The PDF will contain 1 page per image, in the same order. Args: images: A sequence of images. Returns: The PDF bytes. """ if not images: raise ValueError("At least one image is required to create a PDF") # PIL PDF saver does not support RGBA images images = [ image.convert("RGB") if image.mode == "RGBA" else image for image in images ] with io.BytesIO() as pdf_file: images[0].save( pdf_file, save_all=True, append_images=images[1:], format="PDF" ) return pdf_file.getvalue() document = documentai.Document.from_json(json.dumps(gt_json)) synthesized_images = [decode_image(page.image.content) for page in document.pages] pdf_bytes = create_pdf_from_images(synthesized_images) return pdf_bytes, synthesized_images def process_document_sample( project_id: str, location: str, processor_id: str, pdf_bytes: bytes, processor_version: str, ): """This function is used to process the files using pdf bytes and provides the processed file""" # You must set the `api_endpoint` if you use a location other than "us". opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") client = documentai.DocumentProcessorServiceClient(client_options=opts) name = client.processor_version_path( project_id, location, processor_id, processor_version ) raw_document = documentai.RawDocument( content=pdf_bytes, mime_type="application/pdf" ) # Configure the process request request = documentai.ProcessRequest( name=name, raw_document=raw_document, skip_human_review=False ) # Recognizes text entities in the PDF document result = client.process_document(request=request) return result def store_document_as_json(document, bucket_name: str, file_name: str): """ Store Document json in cloud storage. """ storage_client = storage.Client() process_result_bucket = storage_client.get_bucket(bucket_name) document_blob = storage.Blob( name=str(Path(file_name)), bucket=process_result_bucket ) document_blob.upload_from_string(document, content_type="application/json") def convert_and_upload_tiff_to_jpeg( project_id, bucket_name, input_tiff_path, output_jpeg_path ): """ Convert a TIFF file from Google Cloud Storage to a JPEG file and upload it back. Args: project_id (str): The Google Cloud project ID. bucket_name (str): The name of the Google Cloud Storage bucket. input_tiff_path (str): The path of the TIFF file in the bucket to be converted. output_jpeg_path (str): The path where the converted JPEG file will be stored in the bucket. """ from io import BytesIO try: # Initialize Google Cloud Storage client storage_client = storage.Client(project=project_id) bucket = storage_client.get_bucket(bucket_name) # Download TIFF file from GCS blob = bucket.blob(input_tiff_path) tiff_file = BytesIO() blob.download_to_file(tiff_file) tiff_file.seek(0) # Convert to JPEG with Image.open(tiff_file) as im: rgb_im = im.convert("RGB") jpeg_file = BytesIO() rgb_im.save(jpeg_file, "JPEG") jpeg_file.seek(0) # Upload JPEG file to GCS blob = bucket.blob(output_jpeg_path) blob.upload_from_file(jpeg_file, content_type="image/jpeg") except storage.exceptions.GoogleCloudError as gce: print("Google Cloud Storage error: ", gce) return False except OSError as ose: print("File operation error: ", ose) return False except Exception as e: print("An unexpected error occurred: ", e) return False return True def batch_process_documents_sample( project_id: str, location: str, processor_id: str, gcs_input_uri: str, gcs_output_uri: str, processor_version_id: Optional[str] = None, timeout: int = 500, ) -> Any: """It will perform Batch Process on raw input documents Args: project_id (str): GCP project ID location (str): Processor location us or eu processor_id (str): GCP DocumentAI ProcessorID gcs_input_uri (str): GCS path which contains all input files gcs_output_uri (str): GCS path to store processed JSON results processor_version_id (str, optional): VersionID of GCP DocumentAI Processor. Defaults to None. timeout (int, optional): Maximum waiting time for operation to complete. Defaults to 500. Returns: operation.Operation: LRO operation ID for current batch-job """ opts = {"api_endpoint": f"{location}-documentai.googleapis.com"} client = documentai.DocumentProcessorServiceClient(client_options=opts) input_config = documentai.BatchDocumentsInputConfig( gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri) ) output_config = documentai.DocumentOutputConfig( gcs_output_config={"gcs_uri": gcs_output_uri} ) print("Documents are processing(batch-documents)...") name = ( client.processor_version_path( project_id, location, processor_id, processor_version_id ) if processor_version_id else client.processor_path(project_id, location, processor_id) ) request = documentai.types.document_processor_service.BatchProcessRequest( name=name, input_documents=input_config, document_output_config=output_config, ) operation = client.batch_process_documents(request) print("Waiting for operation to complete...") operation.result(timeout=timeout) return operation