code/embedding-function/utilities/helpers/azure_form_recognizer_helper.py (133 lines of code) (raw):

import logging from azure.core.credentials import AzureKeyCredential from azure.ai.formrecognizer import DocumentAnalysisClient from azure.identity import DefaultAzureCredential import html import traceback from .env_helper import EnvHelper logger = logging.getLogger(__name__) class AzureFormRecognizerClient: def __init__(self) -> None: env_helper: EnvHelper = EnvHelper() self.AZURE_FORM_RECOGNIZER_ENDPOINT: str = ( env_helper.AZURE_FORM_RECOGNIZER_ENDPOINT ) if env_helper.AZURE_AUTH_TYPE == "rbac": self.document_analysis_client = DocumentAnalysisClient( endpoint=self.AZURE_FORM_RECOGNIZER_ENDPOINT, credential=DefaultAzureCredential(), headers={ "x-ms-useragent": "chat-with-your-data-solution-accelerator/1.0.0" }, ) else: self.AZURE_FORM_RECOGNIZER_KEY: str = env_helper.AZURE_FORM_RECOGNIZER_KEY self.document_analysis_client = DocumentAnalysisClient( endpoint=self.AZURE_FORM_RECOGNIZER_ENDPOINT, credential=AzureKeyCredential(self.AZURE_FORM_RECOGNIZER_KEY), headers={ "x-ms-useragent": "chat-with-your-data-solution-accelerator/1.0.0" }, ) form_recognizer_role_to_html = { "title": "h1", "sectionHeading": "h2", "pageHeader": None, "pageFooter": None, "paragraph": "p", } def _table_to_html(self, table): table_html = "<table>" rows = [ sorted( [cell for cell in table.cells if cell.row_index == i], key=lambda cell: cell.column_index, ) for i in range(table.row_count) ] for row_cells in rows: table_html += "<tr>" for cell in row_cells: tag = ( "th" if (cell.kind == "columnHeader" or cell.kind == "rowHeader") else "td" ) cell_spans = "" if cell.column_span > 1: cell_spans += f" colSpan={cell.column_span}" if cell.row_span > 1: cell_spans += f" rowSpan={cell.row_span}" table_html += f"<{tag}{cell_spans}>{html.escape(cell.content)}</{tag}>" table_html += "</tr>" table_html += "</table>" return table_html def begin_analyze_document_from_url( self, source_url: str, use_layout: bool = True, paragraph_separator: str = "" ): offset = 0 page_map = [] model_id = "prebuilt-layout" if use_layout else "prebuilt-read" try: logger.info("Method begin_analyze_document_from_url started") logger.info(f"Model ID selected: {model_id}") poller = self.document_analysis_client.begin_analyze_document_from_url( model_id, document_url=source_url ) form_recognizer_results = poller.result() # (if using layout) mark all the positions of headers roles_start = {} roles_end = {} for paragraph in form_recognizer_results.paragraphs: # if paragraph.role!=None: para_start = paragraph.spans[0].offset para_end = paragraph.spans[0].offset + paragraph.spans[0].length roles_start[para_start] = ( paragraph.role if paragraph.role is not None else "paragraph" ) roles_end[para_end] = ( paragraph.role if paragraph.role is not None else "paragraph" ) for page_num, page in enumerate(form_recognizer_results.pages): tables_on_page = [ table for table in form_recognizer_results.tables if table.bounding_regions[0].page_number == page_num + 1 ] # (if using layout) mark all positions of the table spans in the page page_offset = page.spans[0].offset page_length = page.spans[0].length table_chars = [-1] * page_length for table_id, table in enumerate(tables_on_page): for span in table.spans: # replace all table spans with "table_id" in table_chars array for i in range(span.length): idx = span.offset - page_offset + i if idx >= 0 and idx < page_length: table_chars[idx] = table_id # build page text by replacing charcters in table spans with table html and replace the characters corresponding to headers with html headers, if using layout page_text = "" added_tables = set() for idx, table_id in enumerate(table_chars): if table_id == -1: position = page_offset + idx if position in roles_start.keys(): role = roles_start[position] html_role = self.form_recognizer_role_to_html.get(role) if html_role is not None: page_text += f"<{html_role}>" if position in roles_end.keys(): role = roles_end[position] html_role = self.form_recognizer_role_to_html.get(role) if html_role is not None: page_text += f"</{html_role}>" page_text += form_recognizer_results.content[page_offset + idx] elif table_id not in added_tables: page_text += self._table_to_html(tables_on_page[table_id]) added_tables.add(table_id) page_text += " " page_map.append( {"page_number": page_num, "offset": offset, "page_text": page_text} ) offset += len(page_text) return page_map except Exception as e: logger.exception(f"Exception in begin_analyze_document_from_url: {e}") raise ValueError(f"Error: {traceback.format_exc()}. Error: {e}") finally: logger.info("Method begin_analyze_document_from_url ended")