chunking/chunkers/doc_analysis_chunker.py (135 lines of code) (raw):
import logging
import os
import re
from langchain_text_splitters import MarkdownTextSplitter, RecursiveCharacterTextSplitter
from .base_chunker import BaseChunker
from ..exceptions import UnsupportedFormatError
from tools import DocumentIntelligenceClient
class DocAnalysisChunker(BaseChunker):
"""
DocAnalysisChunker class is responsible for analyzing and splitting document content into chunks
based on specific format criteria, utilizing the Document Intelligence service for content analysis.
Format Support:
---------------
The DocAnalysisChunker class leverages the Document Intelligence service to process and analyze
a wide range of document formats. The class ensures that document content is accurately processed
and divided into manageable chunks.
- Supported Formats: The chunker processes document formats supported by the Document Intelligence client.
- Unsupported Formats: If a document's format is not supported by the client, an `UnsupportedFormatError` is raised.
Chunking Parameters:
--------------------
- max_chunk_size: The maximum size of each chunk in tokens. This value is sourced from the `NUM_TOKENS`
environment variable, with a default of 2048 tokens.
- token_overlap: The number of overlapping tokens between consecutive chunks, sourced from the `TOKEN_OVERLAP`
environment variable, with a default of 100 tokens.
- minimum_chunk_size: The minimum size of each chunk in tokens, sourced from the `MIN_CHUNK_SIZE` environment
variable, with a default of 100 tokens.
Document Analysis:
------------------
- The document is analyzed using the Document Intelligence service, extracting its content and structure.
- The analysis process includes identifying the number of pages and providing a preview of the content.
- If the document is large, a warning is logged to indicate potential timeout issues during processing.
Content Chunking:
-----------------
- The document content is split into chunks using format-specific strategies.
- HTML tables in the content are replaced with placeholders during the chunking process to simplify splitting.
- After chunking, the original content, such as HTML tables, is restored in place of the placeholders.
- The chunking process also manages page numbering based on the presence of page breaks, ensuring each chunk
is correctly associated with its corresponding page.
Error Handling:
---------------
- The class includes comprehensive error handling during document analysis, such as managing unsupported formats
and handling general exceptions.
- The chunking process's progress and outcomes, including the number of chunks created or skipped, are logged.
"""
def __init__(self, data, max_chunk_size=None, minimum_chunk_size=None, token_overlap=None):
super().__init__(data)
self.max_chunk_size = max_chunk_size or int(os.getenv("NUM_TOKENS", "2048"))
self.minimum_chunk_size = minimum_chunk_size or int(os.getenv("MIN_CHUNK_SIZE", "100"))
self.token_overlap = token_overlap or int(os.getenv("TOKEN_OVERLAP", "100"))
self.docint_client = DocumentIntelligenceClient()
self.supported_formats = self.docint_client.file_extensions
def get_chunks(self):
"""
Analyzes the document and generates content chunks based on the analysis.
This method performs the following steps:
1. Checks if the document format is supported.
2. Analyzes the document using the Document Intelligence Client with a retry mechanism.
3. Processes the document content into chunks based on the analysis.
Returns:
list: A list of dictionaries, each representing a chunk of the document content.
Raises:
UnsupportedFormatError: If the document format is not supported.
Exception: If there is an error during document analysis.
"""
if self.extension not in self.supported_formats:
raise UnsupportedFormatError(f"[doc_analysis_chunker] {self.extension} format is not supported")
logging.info(f"[doc_analysis_chunker][{self.filename}] Running get_chunks.")
document, analysis_errors = self._analyze_document_with_retry()
if analysis_errors:
formatted_errors = ', '.join(map(str, analysis_errors))
raise Exception(f"Error in doc_analysis_chunker analyzing {self.filename}: {formatted_errors}")
chunks = self._process_document_chunks(document)
return chunks
def _analyze_document_with_retry(self, retries=3):
"""
Analyzes the document using the Document Intelligence Client, with a retry mechanism for error handling.
Args:
retries (int): The number of times to retry the document analysis in case of failure. Defaults to 3 retries.
Returns:
tuple: A tuple containing the analysis result and any errors encountered.
Raises:
Exception: If the document analysis fails after the specified number of retries.
"""
for attempt in range(retries):
try:
document, analysis_errors = self.docint_client.analyze_document_from_bytes(file_bytes=self.document_bytes, filename=self.filename)
return document, analysis_errors
except Exception as e:
logging.error(f"[doc_analysis_chunker][{self.filename}] docint analyze document failed on attempt {attempt + 1}/{retries}: {str(e)}")
if attempt == retries - 1:
raise
return None, None, None
def _process_document_chunks(self, document):
"""
Processes the analyzed document content into manageable chunks.
Args:
document (dict): The analyzed document content provided by the Document Intelligence Client.
Returns:
list: A list of dictionaries, where each dictionary represents a processed chunk of the document content.
The method performs the following steps:
1. Prepares the document content for chunking, including numbering page breaks.
2. Splits the content into chunks using a chosen splitting strategy.
3. Iterates through the chunks, determining their page numbers and creating chunk representations.
4. Skips chunks that do not meet the minimum size requirement.
5. Logs the number of chunks created and skipped.
"""
chunks = []
document_content = document['content']
document_content = self._number_pagebreaks(document_content)
text_chunks = self._chunk_content(document_content)
chunk_id = 0
skipped_chunks = 0
current_page = 1
for text_chunk, num_tokens in text_chunks:
current_page = self._update_page(text_chunk, current_page)
chunk_page = self._determine_chunk_page(text_chunk, current_page)
if num_tokens >= self.minimum_chunk_size:
chunk_id += 1
chunk = self._create_chunk(
chunk_id=chunk_id,
content=text_chunk,
page=chunk_page
)
chunks.append(chunk)
else:
skipped_chunks += 1
logging.debug(f"[doc_analysis_chunker][{self.filename}] {len(chunks)} chunk(s) created")
if skipped_chunks > 0:
logging.debug(f"[doc_analysis_chunker][{self.filename}] {skipped_chunks} chunk(s) skipped")
return chunks
def _chunk_content(self, content):
"""
Splits the document content into chunks based on the specified format and criteria.
Yields:
tuple: A tuple containing the chunked content and the number of tokens in the chunk.
"""
splitter = self._choose_splitter()
chunks = splitter.split_text(content)
for chunked_content in chunks:
chunk_size = self.token_estimator.estimate_tokens(chunked_content)
yield chunked_content, chunk_size
def _replace_html_tables(self, content):
"""
Replaces HTML tables in the content with placeholders.
Args:
content (str): The document content.
Returns:
tuple: The content with placeholders and a list of the original tables.
"""
table_pattern = r"(<table[\s\S]*?</table>)"
tables = re.findall(table_pattern, content, re.IGNORECASE)
placeholders = [f"__TABLE_{i}__" for i in range(len(tables))]
for placeholder, table in zip(placeholders, tables):
content = content.replace(table, placeholder)
return content, placeholders, tables
def _restore_original_tables(self, chunks, placeholders, tables):
"""
Restores original tables in the chunks from placeholders.
Args:
chunks (list): The list of text chunks.
placeholders (list): The list of table placeholders.
tables (list): The list of original tables.
Returns:
list: The list of chunks with original tables restored.
"""
for placeholder, table in zip(placeholders, tables):
chunks = [chunk.replace(placeholder, table) for chunk in chunks]
return chunks
def _choose_splitter(self):
"""
Chooses the appropriate splitter based on document format.
Returns:
object: The splitter to use for chunking.
"""
if self.docint_client.output_content_format == "markdown":
return MarkdownTextSplitter.from_tiktoken_encoder(
chunk_size=self.max_chunk_size,
chunk_overlap=self.token_overlap
)
else:
separators = [".", "!", "?"] + [" ", "\n", "\t"]
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
separators=separators,
chunk_size=self.max_chunk_size,
chunk_overlap=self.token_overlap
)
def _number_pagebreaks(self, content):
"""
Finds and numbers all PageBreaks in the content.
Args:
content (str): The document content.
Returns:
str: Content with numbered PageBreaks.
"""
pagebreaks = re.findall(r'<!-- PageBreak -->', content)
for i, _ in enumerate(pagebreaks, 1):
content = content.replace('<!-- PageBreak -->', f'<!-- PageBreak{str(i).zfill(5)} -->', 1)
return content
def _update_page(self, content, current_page):
"""
Updates the current page number based on the content.
Args:
content (str): The content chunk being processed.
current_page (int): The current page number.
Returns:
int: The updated current page number.
"""
matches = re.findall(r'PageBreak(\d{5})', content)
if matches:
page_number = int(matches[-1])
if page_number >= current_page:
current_page = page_number + 1
return current_page
def _determine_chunk_page(self, content, current_page):
"""
Determines the chunk page number based on the position of the PageBreak element.
Args:
content (str): The content chunk being processed.
current_page (int): The current page number.
Returns:
int: The page number for the chunk.
"""
match = re.search(r'PageBreak(\d{5})', content)
if match:
page_number = int(match.group(1))
position = match.start() / len(content)
# Determine the chunk_page based on the position of the PageBreak element
if position < 0.5:
chunk_page = page_number + 1
else:
chunk_page = page_number
else:
chunk_page = current_page
return chunk_page
def _truncate_chunk(self, text):
"""
Truncates and normalizes the text to ensure it fits within the maximum chunk size.
This method first cleans up the text by removing unnecessary spaces and line breaks.
If the text still exceeds the maximum token limit, it iteratively truncates the text
until it fits within the limit.
This method overrides the parent class's method because it includes logic to retain
PageBreaks within the truncated text.
Args:
text (str): The text to be truncated and normalized.
Returns:
str: The truncated and normalized text.
"""
# Clean up text (e.g. line breaks)
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'[\n\r]+', ' ', text).strip()
page_breaks = re.findall(r'PageBreak\d{5}', text)
# Truncate if necessary
if self.token_estimator.estimate_tokens(text) > self.max_chunk_size:
logging.info(f"[doc_analysis_chunker][{self.filename}] token limit reached, truncating...")
step_size = 1 # Initial step size
iteration = 0 # Iteration counter
while self.token_estimator.estimate_tokens(text) > self.max_chunk_size:
# Truncate the text
text = text[:-step_size]
iteration += 1
# Increase step size exponentially every 5 iterations
if iteration % 5 == 0:
step_size = min(step_size * 2, 100)
# Reinsert page breaks and recheck size
for page_break in page_breaks:
page_break_text = f" <!-- {page_break} -->"
if page_break not in text:
# Calculate the size needed for the page break addition
needed_size = self.token_estimator.estimate_tokens(page_break_text)
# Truncate exactly the size needed to accommodate the page break
while self.token_estimator.estimate_tokens(text) + needed_size > self.max_chunk_size:
text = text[:-1] # Remove one character at a time
# Now add the page break
text += page_break_text
return text