# Label Section Headers

* Author: docai-incubator@google.com

## Disclaimer

This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI Incubator Team**. No guarantees of performance are implied.

## Objective

This tool is helpful to label headers, sub-headers and sub-sub-headers based on the provided range of font-sizes for each header-type. It creates new entities for each header type.
Added few parameters so that User/Customer can able to tune this tool as per their layout  
headers - text only with bold is considered for headers  
[section]_text - text only with non-bold/plain is considered for headers

based on sections_count 1 or 2 or 3  
 * it will select entity labels as (h) or (h, sh) or (h,sh,ssh) 

based bold_text_threshold range - [0,1]  
 * it will discard font_sizes which falls under mentioned threshold, then it will group all fonts which are above threshold creates groups based on `section_count`


footer_threshold & header_threshold - [0-1] i.e, normalized y-vertices point  
* most data in footer and header sections are in bold-text, if we utilize these thresholds to discard all tokens which fall in these regions, we improve bold fonts distribution.  
NOTE: if user utilizing footer_threshold & header_threshold, it is recommended to keep bold_text_threshold to zero, so that all tokens are used to create entities

## Prerequisites
* Access to vertex AI Notebook or Google Colab
* Python
* Vertex AI Notebook
* GCS Folder Path
* Document OCR Processor

## Step by Step procedure 

### 1.Importing Required Modules

In [None]:
!wget https://raw.githubusercontent.com/GoogleCloudPlatform/document-ai-samples/main/incubator-tools/best-practices/utilities/utilities.py

In [None]:
import math
from collections import Counter
from typing import Dict, List, Sequence, Tuple, Optional, Union

from google.api_core.client_options import ClientOptions
from google.cloud import documentai_v1beta3 as documentai
from google.cloud import storage
from utilities import file_names, store_document_as_json

### 2.Setup the inputs

In [None]:
project_id = "xx-xx-xx"
location = "us"  # Format is "us" or "eu"
processor_id = "xx-xx-xx"
processor_version = "pretrained-ocr-v2.0-2023-06-02"
mime_type = "application/pdf"
input_gcs_path = "gs://BUCKET_NAME/headers_layout_detection/samples/"
output_gcs_path = "gs://BUCKET_NAME/headers_layout_detection/output/"
# to (h) or (h, sh) or (h,sh,ssh) <- section_count
sections_count = 2
# (0 - 1 range) 0.1 -> leassthan 10% of bold fonts converage is discarded
# if you are not willing discard  any text in bold then also provide zero
bold_text_threshold = 0
# 0-1 range, 0.1 -> 10% of page bottom to exclude for token,
# i.e, it covers (0 - 0.90 normalized y coord)
footer_offset = 0.1
# 0-1 range, 0.1 -> 10% of page bottom to exclude for token
# i.e, it covers (0.10 - 1.0 normalized y coord)
header_offset = 0.07

`processor_id`: DocumentAI OCR Processor Id

`location`: Processor Location

`processor_version`: OCR processor version of V2.X

`input_gcs_path`: GCS folder path containing input samples

`output_gcs_path`: GCS folder path containing to store post-processed results

`mime_type`: mime type of input files

`sections_count` : Number of header/section levels required (1/2/3)
 * 1/2/3  -> (h) or (h, sh) or (h,sh,ssh)
 
`bold_text_threshold` : Threshold value to discard fonts which falls less than this value
 * [0 - 1] -> 0.1 means less than 10% covered bold fonts are discarded 
 
`footer_offset` : Threshold value(normalized y-coord) to discard tokens which falls in this range i.e, y coord [footer_offest, 1]
 * [0-1] ->  0.1 means 10% of page bottom to exclude for tokens
 
`header_offset` : Threshold value(normalized y-coord) to discard tokens which falls in this range, i.e, y coord [0, header_offset]
 * [0-1] -> 0.1 means 10% of page header to exclude for tokens


### 3.Run the required functions

In [None]:
def process_document_ocr_sample(
    project_id: str,
    location: str,
    processor_id: str,
    processor_version: str,
    pdf_bytes: bytes,
    mime_type: str,
) -> documentai.Document:
    """
    Processes a PDF document using the Document AI OCR processor.

    Args:
        project_id (str): The Google Cloud project ID.
        location (str): The location where the Document AI processor is hosted.
        processor_id (str): The ID of the Document AI processor to use.
        processor_version (str): The version of the processor to use (e.g., "pretrained-ocr-v1.1").
        pdf_bytes (bytes): The PDF file content as a byte stream.
        mime_type (str): The MIME type of the file (e.g., 'application/pdf').

    Returns:
        documentai.Document: A Document object containing the OCR-processed results,
                             including text and layout information from the document.
    """

    client_opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentProcessorServiceClient(client_options=client_opts)
    name = client.processor_version_path(
        project_id, location, processor_id, processor_version
    )
    raw_doc = documentai.RawDocument(content=pdf_bytes, mime_type=mime_type)
    premium_features = documentai.OcrConfig.PremiumFeatures(compute_style_info=True)
    ocr_config = documentai.OcrConfig(
        enable_native_pdf_parsing=False, premium_features=premium_features
    )
    process_options = documentai.ProcessOptions(ocr_config=ocr_config)
    request = documentai.ProcessRequest(
        name=name,
        raw_document=raw_doc,
        # Only supported for Document OCR processor
        process_options=process_options,
    )
    result = client.process_document(request=request)
    return result.document


def get_headers(
    tokens: Sequence[documentai.Document.Page.Token],
    fonts_list: List[int],
    is_bold: bool = True,
    footer_threshold: float = 0.05,
    header_threshold: float = 0.05,
):
    """
    Filters and retrieves header tokens from a sequence of document tokens based on font size, boldness,
    and threshold settings for headers and footers.

    Args:
        tokens (Sequence[documentai.Document.Page.Token]): A sequence of token objects from the document.
        fonts_list (List[int]): A list of font sizes that are considered for header detection.
        is_bold (bool): A flag to filter tokens based on whether they are bold or not. Defaults to True (bold).
        footer_threshold (float): The vertical position threshold for discarding tokens as footers. Defaults to 0.05.
        header_threshold (float): The vertical position threshold for discarding tokens as headers. Defaults to 0.05.

    Returns:
        List[documentai.Document.Page.Token]: A list of tokens that qualify as headers based on the specified conditions.
    """
    headers = []
    for token in tokens:
        token: documentai.Document.Page.Token = token
        if is_token_need_to_discard(token, footer_threshold, header_threshold):
            continue
        font_size = token.style_info.font_size
        bold = token.style_info.bold
        if is_bold:
            if font_size in fonts_list and bold:
                headers.append(token)
        else:
            if font_size in fonts_list and not bold:
                headers.append(token)
    return headers


def get_groups(
    headers: List[documentai.Document.Page.Token],
) -> List[List[documentai.Document.Page.Token]]:
    """
    Groups tokens (headers) based on their text anchor indices.

    Args:
        headers (List[documentai.Document.Page.Token]): List of token headers.

    Returns:
        List[List[documentai.Document.Page.Token]]: A list of grouped token headers.
    """

    group = []
    groups = []
    for idx, header in enumerate(headers[:-1]):
        curr_ts = header.layout.text_anchor.text_segments[0]
        next_ts = headers[idx + 1].layout.text_anchor.text_segments[0]
        curr = curr_ts.start_index, curr_ts.end_index
        nxt = next_ts.start_index, next_ts.end_index
        if len(set([*curr, *nxt])) == 3:
            group.append(header)
        if len(set([*curr, *nxt])) == 4:
            group.append(header)
            groups.append(group)
            group = []
    else:
        group.append(headers[-1])
        groups.append(group)
    return groups


def get_new_entity_attrs(
    tokens: List[documentai.Document.Page.Token],
) -> Tuple[List[int], List[int], List[float], List[float], List[float]]:
    """
    Extracts attributes from a list of tokens, including start and end indices, x and y coordinates, and confidences.

    Args:
        tokens (List[Document.Page.Token]): List of tokens from which to extract attributes.

    Returns:
        Tuple[List[int], List[int], List[float], List[float], List[float]]:
            - List of start indices (si) for the tokens.
            - List of end indices (ei) for the tokens.
            - List of x coordinates for the token bounding polygons.
            - List of y coordinates for the token bounding polygons.
            - List of confidence scores for each token's layout.
    """
    si = []
    ei = []
    x, y = [], []
    confidences = []
    for token in tokens:
        ts = token.layout.text_anchor.text_segments[0]
        si.append(ts.start_index)
        ei.append(ts.end_index)
        token_x, token_y = get_x_y_list(token.layout.bounding_poly)
        x.extend(token_x)
        y.extend(token_y)
        confidences.append(token.layout.confidence)
    return si, ei, x, y, confidences


def get_x_y_list(
    bounding_poly: documentai.BoundingPoly,
) -> Tuple[List[float], List[float]]:
    """
    Extracts the x and y coordinates from a BoundingPoly's normalized vertices.

    Args:
        bounding_poly (BoundingPoly): A bounding polygon object that contains the normalized vertices.

    Returns:
        Tuple[List[float], List[float]]:
            - A list of x coordinates.
            - A list of y coordinates.
    """
    x, y = [], []
    for nvs in bounding_poly.normalized_vertices:
        x.append(nvs.x)
        y.append(nvs.y)
    return x, y


def get_normalized_vertices(
    x: List[float], y: List[float]
) -> List[documentai.NormalizedVertex]:
    """
    Creates a list of normalized vertices (x, y coordinates) based on the bounding box
    formed by the minimum and maximum values from the input x and y coordinates.

    Args:
        x (List[float]): A list of x coordinates.
        y (List[float]): A list of y coordinates.

    Returns:
        List[NormalizedVertex]: A list of four `NormalizedVertex` objects representing
        the corners of the bounding box defined by the input x and y coordinates.
    """
    nvs = []
    xy = [[min(x), min(y)], [max(x), min(y)], [max(x), max(y)], [min(x), max(y)]]
    for _x, _y in xy:
        nv = documentai.NormalizedVertex(x=_x, y=_y)
        nvs.append(nv)
    return nvs


def create_headers_ents(
    ent_type: str,
    text: str,
    page_num: int,
    groups: List[List[documentai.Document.Page.Token]],
) -> List[documentai.Document.Entity]:
    """
    Creates entities for header sections from token groups.

    Args:
        ent_type (str): The type of entity being created.
        text (str): The full text of the document.
        page_num (int): The page number where the headers are found.
        groups (List[List[Document.Page.Token]]): A list of groups of tokens representing headers.

    Returns:
        List[Document.Entity]: A list of Document.Entity objects created from the token groups.
    """
    entities = []
    for tokens in groups:
        si, ei, x, y, confidences = get_new_entity_attrs(tokens)
        _confidence = sum(confidences) / len(confidences)
        _type = ent_type
        _ts = documentai.Document.TextAnchor.TextSegment(
            start_index=min(si), end_index=max(ei)
        )
        _text_anchor = documentai.Document.TextAnchor(text_segments=[_ts])
        _mention_text = text[_ts.start_index : _ts.end_index]
        _bounding_poly = documentai.BoundingPoly()
        _bounding_poly.normalized_vertices = get_normalized_vertices(x, y)
        _page_ref = documentai.Document.PageAnchor.PageRef(
            page=page_num, bounding_poly=_bounding_poly
        )
        _page_anchor = documentai.Document.PageAnchor(page_refs=[_page_ref])
        entity = documentai.Document.Entity(
            confidence=_confidence,
            type_=_type,
            mention_text=_mention_text,
            text_anchor=_text_anchor,
            page_anchor=_page_anchor,
        )
        entities.append(entity)
    return entities


def is_token_need_to_discard(
    token: documentai.Document.Page.Token,
    footer_threshold: float = 0.05,
    header_threshold: float = 0.05,
) -> bool:
    """
    Determines whether a token should be discarded based on its position
    relative to header and footer thresholds.

    Args:
        token (Document.Page.Token): The token to evaluate.
        footer_threshold (float): The threshold for the footer position (0.0 to 1.0).
        header_threshold (float): The threshold for the header position (0.0 to 1.0).

    Returns:
        bool: True if the token should be discarded, False otherwise.
    """
    footer_threshold = 1 - footer_threshold
    bp = token.layout.bounding_poly
    _, y = get_x_y_list(bp)
    y_max = max(y, default=1)
    y_min = min(y, default=0)
    # to discard footer-section
    if y_max > footer_threshold:
        return True
    # to discard top header-section
    if y_min < header_threshold:
        return True
    return False


# to get document wise fonts distribution
def get_fonts_distribution(
    doc: documentai.Document,
    is_bold: bool = True,
    footer_threshold: float = 0.05,
    header_threshold: float = 0.05,
) -> dict:
    """
    Analyzes the font distribution in a Document object.

    Args:
        doc (Document): The Document object containing pages and tokens.
        is_bold (bool): Flag to include only bold fonts if True.
        footer_threshold (float): The threshold for footer position (0.0 to 1.0).
        header_threshold (float): The threshold for header position (0.0 to 1.0).

    Returns:
        dict: A dictionary containing font sizes as keys and their frequency
              distribution as values, sorted in descending order of frequency.
    """
    fonts = []
    for page in doc.pages:
        for token in page.tokens:
            if is_token_need_to_discard(token, footer_threshold, header_threshold):
                continue
            if is_bold:
                if token.style_info and token.style_info.bold:
                    fonts.append(token.style_info.font_size)
            else:
                fonts.append(token.style_info.font_size)
    font_freqs = Counter(fonts)
    print(f"\tbold = {is_bold} tokens count - {len(fonts)}")
    count = sum(font_freqs.values())
    fonts_freq_dist = {k: v / count for k, v in font_freqs.items()}
    fonts_freq_dist = sorted(
        fonts_freq_dist.items(), key=lambda item: item[1], reverse=True
    )
    fonts_freq_dist = dict(fonts_freq_dist)
    return fonts_freq_dist


def bag_fonts_desc(
    fonts_dist: Dict[str, float], no_of_groups: int, threshold: float = 0.1
) -> List[List[str]]:
    """
    Groups font sizes based on their frequency distribution.

    Args:
        fonts_dist (Dict[str, float]): A dictionary containing font sizes as keys
                                         and their frequency distribution as values.
        no_of_groups (int): The number of groups to create.
        threshold (float): The minimum frequency a font size must have to be included.
                           Defaults to 0.1.

    Returns:
        List[List[str]]: A list of groups, where each group is a list of font sizes.
    """
    fonts_dist = {k: v for k, v in fonts_dist.items() if v > threshold}
    fonts = list(fonts_dist.keys())
    fonts_cnt = len(fonts)
    fonts_per_group = math.ceil(fonts_cnt / no_of_groups)
    groups = []
    for idx in range(no_of_groups):
        start = idx * fonts_per_group
        # end = start + fonts_per_group
        end = (idx + 1) * fonts_per_group
        group = fonts[start:end]
        groups.append(group)
    # h, sh, ssh
    return groups[::-1]


def adjust_empty_header_font_tags(
    fonts_list: List[Union[List[str], None]]
) -> List[List[str]]:
    """
    Adjusts the font tags for headers based on the number of sections present.

    Args:
        fonts_list (List[Union[List[str], None]]): A list containing font sections
                                                     for headers, where each section
                                                     is a list of font sizes or None.

    Returns:
        List[List[str]]: A list of three sections, each a list of font sizes,
                         ensuring that empty sections are added as needed.
    """
    counter = 0
    updated_list = []
    for section in fonts_list:
        if section:
            counter += 1
            updated_list.append(section)
    if counter == 3:
        return updated_list
    elif counter == 2:
        updated_list.append([])
        return updated_list
    elif counter == 1:
        updated_list.extend([[], []])
        return updated_list
    elif not counter:
        # to handle if page has no tokens
        return [[], [], []]
    return fonts_list


def tag_text_with_section_type(doc: documentai.Document) -> documentai.Document:
    """
    Tags entities in a Document with their respective section types.

    Args:
        doc (documentai.Document): The Document object containing entities to be tagged.

    Returns:
        documentai.Document: The updated Document object with tagged entities.
    """

    updated_entities = []

    def ent_sort_key(ent: documentai.Document.Entity) -> tuple:
        """
        Sort key for entities based on page number and y-coordinate.

        Args:
            ent (documentai.Document.Entity): The entity to extract sorting keys from.

        Returns:
            tuple: A tuple containing the page number and the y-coordinate.
        """
        pg_no = ent.page_anchor.page_refs[0].page
        y_coord = ent.page_anchor.page_refs[0].bounding_poly.normalized_vertices[0].y
        return pg_no, y_coord

    pre_ent_type = ""
    old_entities = sorted(doc.entities, key=ent_sort_key)
    for ent in old_entities:
        if ent.type_ == "text" and pre_ent_type not in ("", "text"):
            ent.type_ = f"{pre_ent_type.rstrip('_text')}_text"
        pre_ent_type = ent.type_
        updated_entities.append(ent)
    doc.entities = updated_entities
    return doc

### 4.Run the code

In [None]:
file_list, files_dict = file_names(input_gcs_path)
input_bucket = input_gcs_path.split("/")[2]
output_splits = output_gcs_path.split("/")
output_bucket = output_splits[2]
output_dir = "/".join(output_splits[3:]).strip("/")
storage_client = storage.Client()
bucket = storage_client.get_bucket(input_bucket)
for fn, fp in files_dict.items():
    print(f"File: {fn}")
    pdf_bytes = bucket.blob(fp).download_as_bytes()
    try:
        res = process_document_ocr_sample(
            project_id, location, processor_id, processor_version, pdf_bytes, mime_type
        )
    except Exception as e:
        print(f"Unable to parse document due to {type(e)}, {str(e)}")
        continue
    text = res.text
    bold_dist = get_fonts_distribution(res, True, footer_offset, header_offset)
    plain_dist = get_fonts_distribution(res, False, footer_offset, header_offset)
    # 3 -> to group headers into 3 types i.e, headers, sub_headers, sub_sub_headers
    bagged_fonts = bag_fonts_desc(
        bold_dist, sections_count, threshold=bold_text_threshold
    )
    # print(f"\th, sh, ssh = {bagged_fonts}")
    # to move all empty fonts lists to end
    h_fonts, sh_fonts, ssh_fonts = bag = adjust_empty_header_font_tags(bagged_fonts)
    print(f"\th, sh, ssh = {bag}")
    text_fonts = list(plain_dist.keys())
    print("\tentities", len(res.entities), end=" ")
    for page in res.pages:
        page_num = page.page_number
        ent_page_num = page_num - 1
        headers = get_headers(page.tokens, h_fonts, True, footer_offset, header_offset)
        sub_headers = get_headers(
            page.tokens, sh_fonts, True, footer_offset, header_offset
        )
        sub_sub_headers = get_headers(
            page.tokens, ssh_fonts, True, footer_offset, header_offset
        )
        text_headers = get_headers(
            page.tokens, text_fonts, False, footer_offset, header_offset
        )
        if headers:
            header_groups = get_groups(headers)
            header_ents = create_headers_ents(
                "header", text, ent_page_num, header_groups
            )
            res.entities.extend(header_ents)
        if sub_headers:
            sub_header_groups = get_groups(sub_headers)
            sub_header_ents = create_headers_ents(
                "sub_header", text, ent_page_num, sub_header_groups
            )
            res.entities.extend(sub_header_ents)
        if sub_sub_headers:
            sub_sub_header_groups = get_groups(sub_sub_headers)
            sub_sub_header_ents = create_headers_ents(
                "sub_sub_header", text, ent_page_num, sub_sub_header_groups
            )
            res.entities.extend(sub_sub_header_ents)
        if text_headers:
            text_header_groups = get_groups(text_headers)
            sub_sub_header_ents = create_headers_ents(
                "text", text, ent_page_num, text_header_groups
            )
            res.entities.extend(sub_sub_header_ents)
    print("\tentities", len(res.entities))
    res = tag_text_with_section_type(res)
    json_str = documentai.Document.to_json(res, including_default_value_fields=False)
    out_fn = fn.rsplit(".", 1)[0]
    out_fp = f"{output_dir}/{out_fn}.json"
    print(f"\tWriting data to gs://{output_bucket}/{out_fp}")
    store_document_as_json(json_str, output_bucket, out_fp)
print("Process Completed!!!")

### 5.Output

The provided code identifies and classifies text elements as label headers, sub-headers, or sub-sub-headers within a document. It does this by analyzing the font size of the text and comparing it to predefined ranges for each header type. As a result, new entities are created that correspond to each identified header type.

<img src="./Images/tool_ouput.png"></img>