In [None]:
!pip install sycamore-ai[elasticsearch]
# Install the Sycamore document ETL library: https://github.com/aryn-ai/sycamore

In [None]:
import os
import sycamore
from sycamore.context import ExecMode
from sycamore.transforms.partition import ArynPartitioner
from sycamore.transforms.extract_schema import LLMPropertyExtractor
from sycamore.transforms.summarize_images import SummarizeImages, LLMImageSummarizer
from sycamore.transforms.standardizer import (
    USStateStandardizer,
    DateTimeStandardizer,
    ignore_errors,
)
from sycamore.transforms.merge_elements import GreedySectionMerger
from sycamore.functions.tokenizer import HuggingFaceTokenizer
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.llms import OpenAI, OpenAIModels

import pyarrow.fs

llm = OpenAI(OpenAIModels.GPT_4O_MINI)
os.environ["ARYN_API_KEY"] = "<MY-ARYN-API-KEY>"

paths = ["s3://aryn-public/ntsb/"]

context = sycamore.init()
# Add exec_mode=ExecMode.LOCAL to .init to run without Ray
docset = context.read.binary(paths=paths, binary_format="pdf")
docset = docset.materialize(
    path="./elasticsearch-tutorial/downloaded-docset",
    source_mode=sycamore.MATERIALIZE_USE_STORED,
)
# Make sure your Aryn token is accessible in the environment variable ARYN_API_KEY
partitioned_docset = docset.partition(
    partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True)
).materialize(
    path="./elasticsearch-tutorial/partitioned-docset",
    source_mode=sycamore.MATERIALIZE_USE_STORED,
)
partitioned_docset.execute()

In [None]:
schema = {
    "type": "object",
    "properties": {
        "accidentNumber": {"type": "string"},
        "dateAndTime": {"type": "date"},
        "location": {
            "type": "string",
            "description": "US State where the incident occured",
        },
        "aircraft": {"type": "string"},
        "aircraftDamage": {"type": "string"},
        "injuries": {"type": "string"},
        "definingEvent": {"type": "string"},
    },
    "required": ["accidentNumber", "dateAndTime", "location", "aircraft"],
}

schema_name = "FlightAccidentReport"
property_extractor = LLMPropertyExtractor(
    llm=llm, num_of_elements=20, schema_name=schema_name, schema=schema
)

enriched_docset = (
    partitioned_docset
    # Extracts the properties based on the schema defined
    .extract_properties(property_extractor=property_extractor)
    # Summarizes images that were extracted using an LLM
    .transform(SummarizeImages, summarizer=LLMImageSummarizer(llm=llm))
)

formatted_docset = (
    enriched_docset
    # Converts state abbreviations to their full names.
    .map(
        lambda doc: ignore_errors(
            doc, USStateStandardizer, ["properties", "entity", "location"]
        )
    )
    # Converts datetime into a common format
    .map(
        lambda doc: ignore_errors(
            doc, DateTimeStandardizer, ["properties", "entity", "dateAndTime"]
        )
    )
)


merger = GreedySectionMerger(
    tokenizer=HuggingFaceTokenizer("sentence-transformers/all-MiniLM-L6-v2"),
    max_tokens=512,
)
chunked_docset = formatted_docset.merge(merger=merger)

model_name = "thenlper/gte-small"

embedded_docset = (
    chunked_docset.spread_properties(["entity", "path"])
    .explode()
    .embed(
        embedder=SentenceTransformerEmbedder(batch_size=10_000, model_name=model_name)
    )
)

embedded_docset = embedded_docset.materialize(
    path="./elasticsearch-tutorial/embedded-docset",
    source_mode=sycamore.MATERIALIZE_USE_STORED,
)
embedded_docset.execute()

In [None]:
# Write to a persistent Elasticsearch Index. Note: You must have a specified elasticsearch instance running for this to work.
# For more information on how to set one up, refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html

url = "http://localhost:9200"
index_name = "aryn-demo"
embedded_ds.write.elasticsearch(
    url=url,
    index_name=index_name,
    es_client_args={"basic_auth": ("<YOUR-USERNAME>", os.getenv("ELASTIC_PASSWORD"))},
    mappings={
        "properties": {
            "embeddings": {
                "type": "dense_vector",
                "dims": dimensions,
                "index": True,
                "similarity": "cosine",
            },
            "properties": {"type": "object"},
        }
    },
)

In [None]:
# Verify data has been loaded using DocSet Query to retrieve chunks
query_params = {"match_all": {}}
query_docs = ctx.read.elasticsearch(
    url=url,
    index_name=index_name,
    query=query_params,
    es_client_args={"basic_auth": ("<YOUR-USERNAME>", os.getenv("ELASTIC_PASSWORD"))},
)
query_docs.show(show_embedding=False)