In [None]:
# Install a pip package in the current Jupyter kernel
import sys

!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install --upgrade -r /home/jupyter/paper_summarization/requirements.txt

In [None]:
import os
import threading
from typing import Dict, List, Sequence, Set, Tuple

import numpy as np
import pandas as pd
import wget

# Data Cleanup
- Dataset is incomplete, some PDFs of Papers don't have companion summaries and some Summaries don't have PDFs
- Use Sets to find non-matches, download any PDFs that we can get that have summaries, archive the rest

In [None]:
def load_files_into_set(directory: str, target_extension: str) -> Set:
    output_files = set()

    for file in next(os.walk(directory), (None, None, []))[2]:
        # Filename without extension
        filename, extension = os.path.splitext(file)

        if target_extension in extension:
            output_files.add(filename)

    return output_files


def find_matches(pdf_directory: str, txt_directory: str) -> Tuple[Set, Set, Set]:
    pdf_files = load_files_into_set(pdf_directory, ".pdf")
    txt_files = load_files_into_set(txt_directory, ".txt")

    matches = pdf_files & txt_files
    non_matches_pdf = pdf_files - txt_files
    non_matches_txt = txt_files - pdf_files

    return matches, non_matches_pdf, non_matches_txt


def filter_non_matches(
    non_matches: Set, input_directory: str, output_directory: str, extension: str
):
    for file in non_matches:
        os.rename(
            f"{input_directory}/{file}{extension}",
            f"{output_directory}/{file}{extension}",
        )

In [None]:
data_directory = "/home/jupyter/paper_summarization/data"
pdf_directory = f"{data_directory}/pdf"
txt_directory = f"{data_directory}/summary_txt"
# json_directory = f"{data_directory}/json"

cache_file = f"{data_directory}/cache.txt"
sorted_json_directory = f"{data_directory}/json"
unsharded_json_directory = f"{data_directory}/unsharded_json"

full_txt_directory = f"{data_directory}/full_txt"

non_match_directory = f"{data_directory}/non_match"
non_match_pdf_directory = f"{non_match_directory}/pdf"
non_match_txt_directory = f"{non_match_directory}/summary_txt"

acl_site = "https://aclanthology.org"

In [None]:
matches, non_matches_pdf, non_matches_txt = find_matches(pdf_directory, txt_directory)

# Download PDFs of files with summaries but no paper
for file in non_matches_txt:
    url = f"{acl_site}/{file}.pdf"
    path = f"{pdf_directory}/{file}.pdf"
    try:
        # wget.download(url, path)
        print(f"\nDownloading {url}")
        download_thread = threading.Thread(target=wget.download, args=(url, path))
        download_thread.start()
    except Exception as e:
        print(f"Unable to download {url}")
        continue

# After downloading pdf matches, search for missing matches and sort out
matches, non_matches_pdf, non_matches_txt = find_matches(pdf_directory, txt_directory)

filter_non_matches(non_matches_pdf, pdf_directory, non_match_pdf_directory, ".pdf")
filter_non_matches(non_matches_txt, txt_directory, non_match_txt_directory, ".txt")

## Upload Files to GCS

In [None]:
!gsutil -m cp -r -n {txt_directory}/*.txt gs://cloud-samples-data/documentai/ScisummNet/summary_txt/
!gsutil -m cp -r -n {pdf_directory}/*.pdf gs://cloud-samples-data/documentai/ScisummNet/pdf/

# Send Documents to Document AI for OCR Processing

In [None]:
from time import sleep

from google.api_core.client_options import ClientOptions
from google.api_core.operation import Operation
from google.cloud.documentai import BatchDocumentsInputConfig
from google.cloud.documentai import BatchProcessMetadata
from google.cloud.documentai import BatchProcessRequest
from google.cloud.documentai import Document
from google.cloud.documentai import DocumentOutputConfig
from google.cloud.documentai import DocumentProcessorServiceClient
from google.cloud.documentai import GcsDocument
from google.cloud.documentai import GcsDocuments
from google.cloud.documentai import GcsPrefix
from google.cloud.documentai import Processor
from google.cloud.documentai import ProcessorType
from google.cloud.documentai import ProcessRequest
from google.cloud.documentai import RawDocument
from google.cloud.storage import Blob
from google.cloud.storage import Client
from google.protobuf.json_format import ParseError

# See https://cloud.google.com/document-ai/docs/file-types
PDF_MIME_TYPE = "application/pdf"
JSON_MIME_TYPE = "application/json"

ACCEPTED_MIME_TYPES = set(
    {
        PDF_MIME_TYPE,
        "image/jpeg",
        "image/png",
        "image/tiff",
        "image/gif",
        "image/bmp",
        "image/webp",
    }
)

# Based on https://cloud.google.com/document-ai/quotas
BATCH_MAX_FILES = 50
BATCH_MAX_REQUESTS = 5

SKIP_HUMAN_REVIEW = True
TIMEOUT = 200

CONFIDENCE_THRESHOLD = 0.5

GCS_INPUT_BUCKET = "cloud-samples-data"
GCS_INPUT_PREFIX = "documentai/ScisummNet/pdf"

GCS_OUTPUT_BUCKET = "holtskinner-test-datasets"
GCS_OUTPUT_PREFIX = "ScisummNet/output"

DOCAI_PROJECT_ID = "908687846511"
DOCAI_LOCATION = "us"
DOCAI_PROCESSOR_DISPLAY_NAME = "Paper OCR Processor"
DOCAI_PROCESSOR_TYPE = "OCR_PROCESSOR"

CLIENT_OPTIONS = ClientOptions(
    api_endpoint=f"{DOCAI_LOCATION}-documentai.googleapis.com"
)

## Prepare Data in GCS
- Document AI Batch Processing can read/write documents in GCS
- Quotas Limit the amount of Simultaneous Batch Processing Calls that can be made and the amount of documents per call

In [None]:
def create_gcs_uri(bucket_name: str, object_name: str) -> str:
    """
    Create GCS URI
    """
    return f"gs://{bucket_name}/{object_name}"


def file_exists(blob_name: str, existing_files: Set) -> bool:
    basename = os.path.basename(blob_name)
    file_name = os.path.splitext(basename)[0]
    return file_name in existing_files


def load_existing_files() -> Set:
    with open(cache_file, "r") as file:
        lines = file.readlines()
        existing_files = set([line.rstrip() for line in lines])
        return existing_files


def create_batches(
    input_bucket: str,
    input_prefix: str,
    batch_size: int = BATCH_MAX_FILES,
) -> List[List[GcsDocument]]:
    """
    Create batches of documents to process
    """
    if batch_size > BATCH_MAX_FILES:
        raise ValueError(
            f"Batch size must be less than {BATCH_MAX_FILES}. "
            f"You provided {batch_size}"
        )

    storage_client = Client()
    blob_list = storage_client.list_blobs(input_bucket, prefix=input_prefix)

    batches: List[List[GcsDocument]] = []
    batch: List[GcsDocument] = []

    existing_files = load_existing_files()

    for blob in blob_list:
        if blob.content_type not in ACCEPTED_MIME_TYPES:
            print(f"Invalid Mime Type {blob.content_type} - Skipping file {blob.name}")
            continue

        if file_exists(blob.name, existing_files):
            # print(f"Skipping File {blob.name} - Already processed")
            continue

        if len(batch) == batch_size:
            batches.append(batch)
            batch = []

        batch.append(
            GcsDocument(
                gcs_uri=create_gcs_uri(input_bucket, blob.name),
                mime_type=blob.content_type,
            )
        )

    batches.append(batch)
    return batches


def add_batch_to_cache_file(batch: List[GcsDocument]):
    """
    Add list of processed files to a persistent list in txt format to prevent re-processing
    """
    with open(cache_file, "a") as f:
        for file in batch:
            basename = os.path.basename(file.gcs_uri)
            file_name = os.path.splitext(basename)[0]
            f.write(f"{file_name}\n")

## Create Processor

In [None]:
def create_processor(
    project_id: str, location: str, processor_display_name: str, processor_type: str
) -> Processor:
    docai_client = DocumentProcessorServiceClient(client_options=CLIENT_OPTIONS)

    # The full resource name of the location
    # e.g.: projects/project_id/locations/location
    parent = docai_client.common_location_path(project_id, location)

    # Create a processor
    processor = docai_client.create_processor(
        parent=parent,
        processor=Processor(display_name=processor_display_name, type_=processor_type),
    )
    return processor

## Batch Process Documents with Document AI

In [None]:
def batch_process(
    processor_name: str,
    document_batch: List[GcsDocument],
    gcs_output_uri: str,
    skip_human_review: bool = SKIP_HUMAN_REVIEW,
) -> Operation:
    """
    Calls Batch Process Method with a list of GCS URIs
    Internal Method for constructing Batch Process Requests
    Returns Batch Process Metadata Operation
    """
    docai_client = DocumentProcessorServiceClient(client_options=CLIENT_OPTIONS)

    # Load GCS Input URI into a List of document files
    input_config = BatchDocumentsInputConfig(
        gcs_documents=GcsDocuments(documents=document_batch)
    )

    # Specify Output GCS Bucket
    output_config = DocumentOutputConfig(
        gcs_output_config=DocumentOutputConfig.GcsOutputConfig(gcs_uri=gcs_output_uri)
    )

    request = BatchProcessRequest(
        name=processor_name,
        input_documents=input_config,
        document_output_config=output_config,
        skip_human_review=skip_human_review,
    )

    return docai_client.batch_process_documents(request)


def batch_process_directory(
    processor_name: str,
    gcs_input_bucket: str,
    gcs_input_prefix: str,
    gcs_output_bucket: str,
    gcs_output_prefix: str,
):
    """
    Load documents from GCS
    Create Batches
    Call BatchProcessMethod
    """
    batches = create_batches(gcs_input_bucket, gcs_input_prefix)
    total_batches = len(batches)

    gcs_output_uri = create_gcs_uri(gcs_output_bucket, gcs_output_prefix)

    for i, batch in enumerate(batches):
        if len(batch) <= 0:
            continue

        print(f"Processing batch {i + 1}/{total_batches}: {len(batch)} documents")

        add_batch_to_cache_file(batch)

        operation = batch_process(processor_name, batch, gcs_output_uri)

        print(f"Operation: {operation.operation.name}")

        if i % BATCH_MAX_REQUESTS == BATCH_MAX_REQUESTS - 1 and i < total_batches - 1:
            # Wait for Operation to complete before proceeding due to quotas...
            print(f"Waiting...")
            sleep(TIMEOUT)
            # operation.result(timeout=TIMEOUT)

## Run Processing Workflow

In [None]:
# Create Processor
processor = create_processor(
    DOCAI_PROJECT_ID, DOCAI_LOCATION, DOCAI_PROCESSOR_DISPLAY_NAME, DOCAI_PROCESSOR_TYPE
)
processor_name = processor.name
print(f"Created Processor {processor_name}")

# Process Full Directory of Documents
batch_process_directory(
    processor.name,
    GCS_INPUT_BUCKET,
    GCS_INPUT_PREFIX,
    GCS_OUTPUT_BUCKET,
    GCS_OUTPUT_PREFIX,
)

# Post-Processing

## Download Processed Files

In [None]:
!gsutil -m cp -r -n {create_gcs_uri(GCS_OUTPUT_BUCKET, GCS_OUTPUT_PREFIX)}/* {sorted_json_directory}/

## Extract from `Document.json`
- Combine "Sharded" Document.json files
- Extract Document Text and save to full_txt/document.txt

In [None]:
import threading


def extract_text(root: str, files: List, output_directory: str):
    file_shards = sorted(files)
    document_name = file_shards[0].replace("-0.json", ".txt")
    output_file = f"{output_directory}/{document_name}"

    if os.path.exists(output_file):
        print(f"Skipping {document_name}")
        return

    print(f"Writing {document_name}")

    document_text_shards: List[str] = []

    for file in file_shards:
        if file.endswith(".json"):
            file_path = os.path.join(root, file)

            with open(file_path, "r") as doc_json:
                document = Document.from_json(
                    doc_json.read(), ignore_unknown_fields=True
                )
                document_text_shards.append(document.text)

    full_document_text = "".join(document_text_shards)

    with open(output_file, "w") as full_txt_file:
        full_txt_file.write(full_document_text)

    return

In [None]:
# Extract raw Text from all Document.json files and store as document.txt
for root, dirs, files in os.walk(sorted_json_directory):
    if len(dirs) != 0:
        continue

    shard_thread = threading.Thread(
        target=extract_text, args=(root, files, f"{full_txt_directory}")
    )
    shard_thread.start()

In [None]:
!gsutil -m cp -r -n {full_txt_directory}/*.txt gs://cloud-samples-data/documentai/ScisummNet/full_txt/

## Print OCR Data
- Print OCR Data including tokens, lines, paragraphs from a Document object

In [None]:
def print_page_dimensions(dimension: Document.Page.Dimension) -> None:
    print(f"    Width: {str(dimension.width)}")
    print(f"    Height: {str(dimension.height)}")


def print_detected_langauges(
    detected_languages: Sequence[Document.Page.DetectedLanguage],
) -> None:
    print("    Detected languages:")
    for lang in detected_languages:
        code = lang.language_code
        print(f"        {code} ({lang.confidence:.1%} confidence)")


def print_paragraphs(paragraphs: Sequence[Document.Page.Paragraph], text: str) -> None:
    print(f"    {len(paragraphs)} paragraphs detected:")
    first_paragraph_text = layout_to_text(paragraphs[0].layout, text)
    print(f"        First paragraph text: {repr(first_paragraph_text)}")
    last_paragraph_text = layout_to_text(paragraphs[-1].layout, text)
    print(f"        Last paragraph text: {repr(last_paragraph_text)}")


def print_blocks(blocks: Sequence[Document.Page.Block], text: str) -> None:
    print(f"    {len(blocks)} blocks detected:")
    first_block_text = layout_to_text(blocks[0].layout, text)
    print(f"        First text block: {repr(first_block_text)}")
    last_block_text = layout_to_text(blocks[-1].layout, text)
    print(f"        Last text block: {repr(last_block_text)}")


def print_lines(lines: Sequence[Document.Page.Line], text: str) -> None:
    print(f"    {len(lines)} lines detected:")
    first_line_text = layout_to_text(lines[0].layout, text)
    print(f"        First line text: {repr(first_line_text)}")
    last_line_text = layout_to_text(lines[-1].layout, text)
    print(f"        Last line text: {repr(last_line_text)}")


def print_tokens(tokens: Sequence[Document.Page.Token], text: str) -> None:
    print(f"    {len(tokens)} tokens detected:")
    first_token_text = layout_to_text(tokens[0].layout, text)
    first_token_break_type = tokens[0].detected_break.type_.name
    print(f"        First token text: {repr(first_token_text)}")
    print(f"        First token break type: {repr(first_token_break_type)}")
    last_token_text = layout_to_text(tokens[-1].layout, text)
    last_token_break_type = tokens[-1].detected_break.type_.name
    print(f"        Last token text: {repr(last_token_text)}")
    print(f"        Last token break type: {repr(last_token_break_type)}")


def layout_to_text(layout: Document.Page.Layout, text: str) -> str:
    """
    Document AI identifies text in different parts of the document by their
    offsets in the entirety of the document's text. This function converts
    offsets to a string.
    """
    response = ""
    # If a text segment spans several lines, it will
    # be stored in different text segments.
    for segment in layout.text_anchor.text_segments:
        start_index = int(segment.start_index)
        end_index = int(segment.end_index)
        response += text[start_index:end_index]
    return response


def print_document_ocr_data(document: Document):
    text = document.text
    print(f"Full document text: {text}\n")
    print(f"There are {len(document.pages)} page(s) in this document.\n")

    for page in document.pages:
        print(f"Page {page.page_number}:")
        print_page_dimensions(page.dimension)
        print_detected_langauges(page.detected_languages)
        print_paragraphs(page.paragraphs, text)
        print_blocks(page.blocks, text)
        print_lines(page.lines, text)
        print_tokens(page.tokens, text)