function_app/bp_content_understanding_document.py (150 lines of code) (raw):

import json import logging import os from typing import Optional import azure.functions as func from azure.identity import DefaultAzureCredential, get_bearer_token_provider from dotenv import load_dotenv from pydantic import BaseModel, Field from src.components.content_understanding_client import ( AzureContentUnderstandingClient, get_existing_analyzer_ids, ) from src.helpers.common import MeasureRunTime from src.helpers.content_understanding import ( cu_fields_dict_to_markdown, draw_fields_on_imgs, enrich_extracted_cu_fields, ) from src.helpers.data_loading import load_visual_obj_bytes_to_pil_imgs_dict from src.helpers.image import pil_img_to_base64_bytes, resize_img_by_max, rotate_img_pil load_dotenv() token_provider = get_bearer_token_provider( DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" ) bp_content_understanding_document = func.Blueprint() FUNCTION_ROUTE = "content_understanding_document" # Load environment variables CONTENT_UNDERSTANDING_ENDPOINT = os.getenv("CONTENT_UNDERSTANDING_ENDPOINT") # Load existing analyzer schemas with open("config/content_understanding_schemas.json", "r") as f: CONTENT_UNDERSTANDING_SCHEMAS = json.load(f) cu_client = AzureContentUnderstandingClient( endpoint=CONTENT_UNDERSTANDING_ENDPOINT, azure_ad_token_provider=token_provider, api_version="2024-12-01-preview", enable_face_identification=False, ) # Get list of existing CU analyzers existing_cu_analyzer_ids = get_existing_analyzer_ids(cu_client) class FunctionReponseModel(BaseModel): """ Defines the schema that will be returned by the function. We'll use this to ensure that the response contains the correct values and structure, and to allow a partially filled response to be returned in case of an error. """ success: bool = Field( default=False, description="Indicates whether the pipeline was successful." ) error_text: Optional[str] = Field( default=None, description="If an error occurred, this field will contain the error message.", ) func_time_taken_secs: Optional[float] = Field( default=None, description="The total time taken to process the request." ) enriched_cu_fields: Optional[dict] = Field( default=None, description="The enriched fields result from Content Understanding.", ) formatted_fields_md: Optional[str] = Field( default=None, description="The field outputs in a pre-formatted markdown format.", ) cu_raw_response: Optional[dict] = Field( default=None, description="The raw API response from Content Understanding." ) cu_time_taken_secs: Optional[float] = Field( default=None, description="The time taken to extract the text using Content Understanding.", ) result_imgs_with_bboxes: Optional[dict[int, bytes]] = Field( default=None, description="Dictionary of page images with bounding boxes drawn around the extracted fields.", ) @bp_content_understanding_document.route(route=FUNCTION_ROUTE) def content_understanding_document( req: func.HttpRequest, ) -> func.HttpResponse: """ This function processes a request to extract fields from a PDF document using Azure Content Understanding. If an error occurs at any stage, the function will return a partial response with the error message and the fields that have been populated up to that point. """ logging.info(f"Python HTTP trigger function `{FUNCTION_ROUTE}` received a request.") # Create the object to hold all intermediate and final values. We will progressively update # values as each stage of the pipeline is completed, allowing us to return a partial # response in case of an error at any stage. output_model = FunctionReponseModel(success=False) try: # Create error_text and error_code variables. These will be updated as # we move through the pipeline so that if a step fails, the vars reflect # what has failed. If all steps complete successfully, the vars are # never used. error_text = "An error occurred during processing." error_code = 422 func_timer = MeasureRunTime() func_timer.start() # Check the request body request_json_content = json.loads(req.files["json"].read().decode("utf-8")) analyzer_id = request_json_content.get("analyzer_id", None) file_bytes = req.files["file"].read() file_mime_type = req.files["file"].content_type ### 1. Load the images from the PDF/image input error_text = "An error occurred during image extraction." error_code = 500 doc_page_imgs = load_visual_obj_bytes_to_pil_imgs_dict( file_bytes, file_mime_type, starting_idx=1, pdf_img_dpi=100 ) ### 2. Ensure the analyzer exists error_text = "Invalid Analyzer ID." error_code = 500 # Check if the analyzer already exists when the resource was last checked. global existing_cu_analyzer_ids if analyzer_id not in existing_cu_analyzer_ids: # Refresh the list of existing analyzers existing_cu_analyzer_ids = get_existing_analyzer_ids(cu_client) if analyzer_id not in existing_cu_analyzer_ids: # Analyzer is not available or deployed in the resource. raise KeyError( ( f"Analyzer ID '{analyzer_id}' is not available. " "Ensure that the Analyzer has already been created within the AI services resource." ) ) ### 2. Extract the content using Content Understanding error_text = "An error occurred during Content Understanding extraction." with MeasureRunTime() as cu_timer: response = cu_client.begin_analyze( analyzer_id=analyzer_id, file_bytes=file_bytes, ) cu_result = cu_client.poll_result(response) output_model.cu_raw_response = cu_result output_model.cu_time_taken_secs = cu_timer.time_taken ### 3. Enrich the raw CU API result with additional metadata (normalized polygons, page number etc) error_text = "An error occurred during post-processing." enriched_cu_fields = enrich_extracted_cu_fields( cu_result["result"]["contents"][0] ) output_model.enriched_cu_fields = enriched_cu_fields ### 4. Format the fields in markdown for easy reading output_model.formatted_fields_md = cu_fields_dict_to_markdown( enriched_cu_fields ) ### 5. Draw bounding boxes of the extracted fields on images of the input document # With the locations of the extracted fields now known, we can draw # bounding boxes on the image and to make it easier to digest the output. # Draw the bounding boxes on the image pil_imgs = draw_fields_on_imgs( enriched_fields=enriched_cu_fields, pil_imgs=doc_page_imgs ) # Resize the images to reduce transfer size pil_imgs = { page_num: resize_img_by_max(pil_img, max_height=1000, max_width=1000) for page_num, pil_img in pil_imgs.items() } # Rotate the images to be the correct orientation for page_list_idx, (page_num, pil_img) in enumerate(pil_imgs.items()): pil_imgs[page_num] = rotate_img_pil( pil_img, angle=cu_result["result"]["contents"][0]["pages"][page_list_idx][ "angle" ], ) output_model.result_imgs_with_bboxes = { page_num: pil_img_to_base64_bytes(pil_img) for page_num, pil_img in pil_imgs.items() } ### 5. All steps completed successfully, set success=True and return the final result output_model.success = True output_model.func_time_taken_secs = func_timer.stop() return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=200, ) except Exception as _e: # If an error occurred at any stage, return the partial response. Update the error_text # field to contain the error message, and ensure success=False. output_model.success = False output_model.error_text = error_text output_model.func_time_taken_secs = func_timer.stop() logging.exception(output_model.error_text) return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=error_code, )