function_app/bp_pii_redaction.py (282 lines of code) (raw):

import logging import os from base64 import b64encode from copy import deepcopy from enum import Enum from typing import Optional import azure.functions as func import fitz from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.ai.documentintelligence.models import AnalyzeDocumentRequest from azure.ai.textanalytics import TextAnalyticsClient from azure.identity import DefaultAzureCredential, get_bearer_token_provider from dotenv import load_dotenv from fitz import Document, get_text_length from pydantic import BaseModel, Field from src.helpers.common import MeasureRunTime from src.helpers.data_loading import load_pymupdf_pdf, pymupdf_pdf_page_to_img_pil from src.helpers.image import pil_img_to_base64_bytes logger = logging.getLogger(__name__) load_dotenv() aoai_token_provider = get_bearer_token_provider( DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" ) bp_pii_redaction = func.Blueprint() TEXT_FUNCTION_ROUTE = "pii_redaction_text" PDF_FUNCTION_ROUTE = "pii_redaction_pdf" class PDFTextExtractionMethod(Enum): """Sets the option for extracting raw text from a PDF.""" DOCUMENT_INTELLIGENCE = "DOCUMENT_INTELLIGENCE" PYMUPDF = "PYMUPDF" # Set the text extraction method to be used for PDFs # - PYMUPDF is runs locally and is free, but will only extract text that is embedded into the PDF in the data layer. # Any text embedded within images will not be extracted, and no text will be extracted when the PDF is a scanned image. # - DOCUMENT_INTELLIGENCE uses Azure Document Intelligence to extract all text, including embedded and scanned text. TEXT_EXTRACTION_METHOD = PDFTextExtractionMethod.DOCUMENT_INTELLIGENCE # Load environment variables DOC_INTEL_ENDPOINT = os.getenv("DOC_INTEL_ENDPOINT") LANGUAGE_ENDPOINT = os.getenv("LANGUAGE_ENDPOINT") credential = DefaultAzureCredential() # Create the clients for Document Intelligence and Azure OpenAI text_analytics_client = TextAnalyticsClient( endpoint=LANGUAGE_ENDPOINT, credential=credential ) # Set up the Document Intelligence v4.0 preview client. This will allow us to # use the latest features of the Document Intelligence service. Check out the # Document Intelligence Processor Walkthrough Notebook for more information # (within the `notebooks` folder). DOC_INTEL_MODEL_ID = "prebuilt-read" # Set Document Intelligence model ID di_client = DocumentIntelligenceClient( endpoint=DOC_INTEL_ENDPOINT, credential=credential, api_version="2024-07-31-preview", ) # Setup Pydantic models for validation of the request and response class TextFunctionRequestModel(BaseModel): """ Defines the schema that will be expected in the request body. We'll use this to ensure that the request contains the correct values and structure, and to allow a partially filled request to be processed in case of an error. """ text: Optional[str] = Field(description="The text to be summarized") class TextFunctionReponseModel(BaseModel): """ Defines the schema that will be returned by the function. We'll use this to ensure that the response contains the correct values and structure, and to allow a partially filled response to be returned in case of an error. """ success: bool = Field( False, description="Indicates whether the pipeline was successful." ) error_text: Optional[str] = Field( None, description="If an error occurred, this field will contain the error message.", ) func_time_taken_secs: Optional[float] = Field( default=None, description="The total time taken to process the request." ) redacted_text: Optional[str] = Field(None, description="The raw redacted text.") pii_raw_response: Optional[dict] = Field( None, description="The raw API response from PII recognition." ) def replace_text(text: str, replacements: dict) -> str: """ Replace all occurrences of text in a text with replacement text. :param text: The text to be redacted. :param replacements: A dictionary of text to replace and the text to replace it with. :returns: The redacted text. """ for find_text, replace_text in replacements.items(): text = text.replace(find_text, replace_text) return text @bp_pii_redaction.route(route=TEXT_FUNCTION_ROUTE) def redact_pii_text(req: func.HttpRequest) -> func.HttpResponse: logging.info( f"Python HTTP trigger function `{TEXT_FUNCTION_ROUTE}` received a request." ) # Create the object to hold all intermediate and final values. We will progressively update # values as each stage of the pipeline is completed, allowing us to return a partial # response in case of an error at any stage. output_model = TextFunctionReponseModel(success=False) try: # Create error_text and error_code variables. These will be updated as # we move through the pipeline so that if a step fails, the vars reflect # what has failed. If all steps complete successfully, the vars are # never used. error_text = "An error occurred during processing." error_code = 422 func_timer = MeasureRunTime() func_timer.start() ### 1. Check the request body req_body = req.get_json() try: request_obj = TextFunctionRequestModel(**req_body) except Exception as _e: raise ValueError( ( "The request body was not in the expected format. Please ensure that it is " "a valid JSON object with the following fields: {}" ).format(list(TextFunctionRequestModel.model_fields.keys())) ) ### 2. Create the messages to send to the LLM error_text = "An error occurred during PII recognition" error_code = 500 documents = [request_obj.text] ### 3. Redact PII from the text using Azure AI Language service pii_result = text_analytics_client.recognize_pii_entities( documents=documents, ) output_model.pii_raw_response = [str(doc_result) for doc_result in pii_result] pii_result_doc = pii_result[0] if pii_result_doc.is_error: raise RuntimeError( "An error occurred during PII recognition", ) ### 4. Replace the PII entities with '<<CATEGORY>>' text. # This gives us a more readable output than the redacted text from the raw API response (which simply replaces PII with asterixes). replacements = { entity.text: f"<<{entity.category}>>" for entity in pii_result_doc.entities } output_model.redacted_text = replace_text(request_obj.text, replacements) ### 5. All steps completed successfully, set success=True and return the final result output_model.success = True output_model.func_time_taken_secs = func_timer.stop() return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=200, ) except Exception as _e: # If an error occurred at any stage, return the partial response. Update the error_text # field to contain the error message, and ensure success=False. output_model.success = False output_model.error_text = error_text output_model.func_time_taken_secs = func_timer.stop() logging.exception(output_model.error_text) return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=error_code, ) class PDFFunctionReponseModel(BaseModel): """ Defines the schema that will be returned by the function. We'll use this to ensure that the response contains the correct values and structure, and to allow a partially filled response to be returned in case of an error. """ success: bool = Field( False, description="Indicates whether the pipeline was successful." ) error_text: Optional[str] = Field( None, description="If an error occurred, this field will contain the error message.", ) func_time_taken_secs: Optional[float] = Field( default=None, description="The total time taken to process the request." ) redacted_text: Optional[str] = Field(None, description="The raw redacted text.") redacted_pdf: Optional[str] = Field( None, description="The base64 encoded redacted PDF.", ) input_doc_pdf_page_imgs: Optional[list[bytes]] = Field( None, description="The base64 encoded input PDF pages.", ) redacted_pdf_page_imgs: Optional[list[bytes]] = Field( None, description="The base64 encoded redacted PDF pages.", ) di_raw_response: Optional[dict] = Field( None, description="The raw API response from Document Intelligence." ) pii_raw_response: Optional[dict] = Field( None, description="The raw API response from PII recognition." ) def replace_text_in_pdf( doc: Document, replacements: dict, inplace: bool = True ) -> Document: """ Replace all occurrences of text in a PDF with replacement text. This function will only work for text that is embedded in the PDF in the data layer. Text embedded in images is not supported at this stage and would require the Azure Document Intelligence response (with page numbers and bounding boxes) to be used to redact the text based on it's location in the PDF. :param doc: The PDF to be redacted. :param replacements: A dictionary of text to replace and the text to replace it with. :param inplace: If True, the PDF will be redacted in-place and returned. If False, a new PDF will be returned. :returns: The redacted PDF. """ # The code in this function has been copied and modified from # https://dev.to/abbazs/replace-text-in-pdfs-using-python-42k6 if not inplace: doc = deepcopy(doc) for page_num, page in enumerate(doc, start=1): for find_text, replace_text in replacements.items(): # Search for occurrences of find_text instances = page.search_for(find_text) if not instances: logging.info( ( f"No occurrences of '{find_text}' found on page {page_num}. " "This text could be embedded in an image which is not yet supported by this function." ) ) continue for rect in instances: # First, redact (remove) the original text page.add_redact_annot(rect) page.apply_redactions() # Default values for text properties font = "helv" # Default to Helvetica font_size = 10.0 # Default size color = (255, 0, 0) # Default to black # Calculate the max width of the text to be replaced max_width = rect.x1 - rect.x0 # Calculate the baseline position for text insertion baseline = fitz.Point( rect.x0, rect.y1 - 2.2 ) # Adjust the -2 offset as needed candidate_text_length = get_text_length( replace_text, fontname=font, fontsize=font_size ) if candidate_text_length > max_width: # If the replacement text is too wide to fit in the original location, reduce the font size font_size = font_size * (max_width / candidate_text_length) elif candidate_text_length < max_width: # if the text is too short, change the baseline position to center the text offset = (max_width - candidate_text_length) / 2 baseline = fitz.Point( rect.x0 + offset, rect.y1 - 2.2 ) # Adjust the -2 offset as needed # Normalize the color values to range 0 to 1 normalized_color = ( tuple(c / 255 for c in color) if isinstance(color, tuple) else (0, 0, 0) ) # Insert the new text at the adjusted position page.insert_text( baseline, replace_text, fontsize=font_size, fontname=font, color=normalized_color, ) return doc @bp_pii_redaction.route(route=PDF_FUNCTION_ROUTE) def redact_pii_pdf(req: func.HttpRequest) -> func.HttpResponse: logging.info( f"Python HTTP trigger function `{PDF_FUNCTION_ROUTE}` received a request." ) output_model = PDFFunctionReponseModel(success=False) try: # Create error_text and error_code variables. These will be updated as # we move through the pipeline so that if a step fails, the vars reflect # what has failed. If all steps complete successfully, the vars are # never used. error_text = "An error occurred while reading the PDF file" error_code = 422 func_timer = MeasureRunTime() func_timer.start() ### 1. Check the request body # Check mime_type of the request data mime_type = req.headers.get("Content-Type") if mime_type != "application/pdf": return func.HttpResponse( "This function only supports a Content-Type of 'application/pdf'. Supplied file is of type {}".format( mime_type ), status_code=error_code, ) req_body = req.get_body() if len(req_body) == 0: return func.HttpResponse( "Please provide a base64 encoded PDF in the request body.", status_code=error_code, ) pdf = load_pymupdf_pdf(pdf_bytes=req_body) ### 2. Extract the text and images using Document Intelligence error_text = "An error occurred during text & image extraction." error_code = 500 input_doc_pdf_page_imgs = [ pil_img_to_base64_bytes(pymupdf_pdf_page_to_img_pil(page, 80, 0)) for page in pdf ] output_model.input_doc_pdf_page_imgs = input_doc_pdf_page_imgs # Extract the text from the PDF if TEXT_EXTRACTION_METHOD is PDFTextExtractionMethod.PYMUPDF: raw_text = "\f".join( [page.get_text(page_num) for page_num, page in enumerate(pdf, start=1)] ) elif TEXT_EXTRACTION_METHOD is PDFTextExtractionMethod.DOCUMENT_INTELLIGENCE: poller = di_client.begin_analyze_document( model_id=DOC_INTEL_MODEL_ID, analyze_request=AnalyzeDocumentRequest(bytes_source=req_body), ) di_result = poller.result() output_model.di_raw_response = di_result.as_dict() raw_text = di_result.content else: raise ValueError( f"Invalid text extraction method: {TEXT_EXTRACTION_METHOD}" ) ### 3. Redact PII from the text using Azure AI Language service error_text = "An error occurred during PII recognition." documents = [raw_text] pii_result = text_analytics_client.recognize_pii_entities( documents=documents, ) output_model.pii_raw_response = [str(doc_result) for doc_result in pii_result] pii_result_doc = pii_result[0] if pii_result_doc.is_error: raise Exception("An error occurred during PII recognition") ### 3. Replace the PII entities with '<<CATEGORY>>' text. # This gives us a more readable output than the redacted text from the raw API response (which simply replaces PII with asterixes). replacements = { entity.text: f"<<{entity.category}>>" for entity in pii_result_doc.entities } output_model.redacted_text = replace_text(raw_text, replacements) redacted_pdf = replace_text_in_pdf(pdf, replacements) output_model.redacted_pdf = b64encode(redacted_pdf.tobytes()).decode("utf-8") redacted_pdf_page_imgs = [ pil_img_to_base64_bytes(pymupdf_pdf_page_to_img_pil(page, 80, 0)) for page in redacted_pdf ] output_model.redacted_pdf_page_imgs = redacted_pdf_page_imgs ### 5. All steps completed successfully, set success=True and return the final result output_model.success = True output_model.func_time_taken_secs = func_timer.stop() return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=200, ) except Exception as _e: # If an error occurred at any stage, return the partial response. Update the error_text # field to contain the error message, and ensure success=False. output_model.success = False output_model.error_text = error_text output_model.func_time_taken_secs = func_timer.stop() logging.exception(output_model.error_text) return func.HttpResponse( body=output_model.model_dump_json(), mimetype="application/json", status_code=error_code, )