# Document Chunking With LangChain Document Splitters
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/elastic/elasticsearch-labs/blob/main/notebooks/document-chunking/with-langchain-splitters.ipynb)

**Using Elasticsearch Nested Dense Vector Support**

This interactive notebook will:
- load the model "sentence-transformers__all-minilm-l6-v2" from Hugging Face and into Elasticsearch ML Node
- Use LangChain splitters to chunk the passages into sentences and index them into Elasticsearch with nested dense vector
- perform a search and return docs with the most relevant passages

# Prefer the `semantic_text` field type

**Elasticsearch version 8.15 introduced the [`semantic_text`](https://www.elastic.co/guide/en/elasticsearch/reference/current/semantic-text.html) field type which handles the chunking process behind the scenes. Before continuing with this notebook, we highly recommend looking into this:**

- **<https://www.elastic.co/search-labs/blog/semantic-search-simplified-semantic-text>**
- **<https://github.com/elastic/elasticsearch-labs/blob/main/notebooks/search/09-semantic-text.ipynb>**

## Dependencies
In this notebook, we're going to use Langchain and the Elasticsearch python client.

We will also require a running Elasticsearch instance with an ML node and model deployed to it.

In [None]:
!python3 -m pip install -qU langchain langchain-community langchain-elasticsearch "elasticsearch<9" "eland<9" jq

### Connect to Elasticsearch

ℹ️ We're using an Elastic Cloud deployment of Elasticsearch for this notebook. If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?onboarding_token=vectorsearch&utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial. 

We'll use the **Cloud ID** to identify our deployment, because we are using Elastic Cloud deployment. To find the Cloud ID for your deployment, go to https://cloud.elastic.co/deployments and select your deployment.

In [None]:
from getpass import getpass

# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")

# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#creating-an-api-key
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

In [3]:
from elasticsearch import Elasticsearch

client = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, api_key=ELASTIC_API_KEY)

### Download our example Dataset
We are going to use Langchain's tooling to ingest and split raw documents into smaller chunks. We are using our example workplace search dataset.

LangChain has a number of other loaders to ingest data from other sources. See their [core loaders](https://python.langchain.com/docs/modules/data_connection/document_loaders/) or [loaders integration](https://python.langchain.com/docs/integrations/document_loaders) for more information. 

In [4]:
from urllib.request import urlopen
import json

url = "https://raw.githubusercontent.com/elastic/elasticsearch-labs/main/datasets/workplace-documents.json"

response = urlopen(url)
data = json.load(response)

with open("temp.json", "w") as json_file:
    json.dump(data, json_file)

In [5]:
from langchain.document_loaders import JSONLoader


def metadata_func(record: dict, metadata: dict) -> dict:
    metadata["name"] = record.get("name")
    metadata["summary"] = record.get("summary")
    metadata["url"] = record.get("url")
    metadata["category"] = record.get("category")
    metadata["updated_at"] = record.get("updated_at")

    return metadata


# For more loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/
# And 3rd party loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/#third-party-loaders
loader = JSONLoader(
    file_path="temp.json",
    jq_schema=".[]",
    content_key="content",
    metadata_func=metadata_func,
)

## Load Model from hugging face
The first thing you will need is a model to create the text embeddings out of the chunks, you can use whatever you would like, but this example will run end to end on the minilm-l6-v2 model. With an Elastic Cloud cluster created or another Elasticsearch cluster ready, we can upload the text embedding model using the eland library.

In [None]:
MODEL_ID = "sentence-transformers__all-minilm-l6-v2"

!eland_import_hub_model \
    --cloud-id $ELASTIC_CLOUD_ID \
    --es-username elastic \
    --es-api-key $ELASTIC_API_KEY \
    --hub-model-id "sentence-transformers/all-MiniLM-L6-v2" \
    --task-type text_embedding \
    --clear-previous \
    --start

### Setting up our Elasticsearch Index
In this example we're going to use a pipeline to do the inference and store the embeddings in our index. 

In this example, we are using the sentence transformers minilm-l6-v2 model, which you will need to is running on the ML node. With this model, we are setting up an index_pipeline to do the inference and store the embeddings in our index.

In [11]:
PIPELINE_ID = "chunk_text_to_passages"
MODEL_DIMS = 384
INDEX_NAME = "nb_parent_retriever_index"

# Create the pipeline
client.ingest.put_pipeline(
    id=PIPELINE_ID,
    processors=[
        {
            "foreach": {
                "field": "passages",
                "processor": {
                    "inference": {
                        "field_map": {"_ingest._value.text": "text_field"},
                        "model_id": MODEL_ID,
                        "target_field": "_ingest._value.vector",
                        "on_failure": [
                            {
                                "append": {
                                    "field": "_source._ingest.inference_errors",
                                    "value": [
                                        {
                                            "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                                            "pipeline": "ml-inference-title-vector",
                                            "timestamp": "{{{ _ingest.timestamp }}}",
                                        }
                                    ],
                                }
                            }
                        ],
                    }
                },
            }
        }
    ],
)

# Create the index
client.indices.create(
    index=INDEX_NAME,
    settings={"index": {"default_pipeline": PIPELINE_ID}},
    mappings={
        "dynamic": "true",
        "properties": {
            "passages": {
                "type": "nested",
                "properties": {
                    "vector": {
                        "properties": {
                            "predicted_value": {
                                "type": "dense_vector",
                                "index": True,
                                "dims": MODEL_DIMS,
                                "similarity": "dot_product",
                            }
                        }
                    }
                },
            }
        },
    },
)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'nb_parent_retriever_index'})

### Utils: Parent Child Splitter Function
This function will split a document into multiple passages, and return the parent document with the child passages. 

It also has an option to chunk the parent document into smaller documents, meaning the parent document will be split into multiple index documents. We will use this in example 2.

In [12]:
from langchain.text_splitter import RecursiveCharacterTextSplitter


def parent_child_splitter(documents, chunk_size: int = 200):
    child_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size)

    docs = []
    for i, doc in enumerate(documents):
        passages = []

        for _doc in child_splitter.split_documents([doc]):
            passages.append(
                {
                    "text": _doc.page_content,
                }
            )

        doc = {
            "content": doc.page_content,
            "metadata": doc.metadata,
            "passages": passages,
        }
        docs.append(doc)

    return docs

### Utils: Pretty Response
This function will print out the response from Elasticsearch in an easier to read format.

In [13]:
def pretty_response(response, show_parent_text=False):
    if len(response["hits"]["hits"]) == 0:
        print("Your search returned no results.")
    else:
        for hit in response["hits"]["hits"]:
            id = hit["_id"]
            score = hit["_score"]
            doc_title = hit["_source"]["metadata"]["name"]
            parent_text = ""

            if show_parent_text:
                parent_text = hit["_source"]["content"]

            passage_text = ""

            for passage in hit["inner_hits"]["passages"]["hits"]["hits"]:
                passage_text += passage["fields"]["passages"][0]["text"][0] + "\n\n"

            pretty_output = f"\nID: {id}\nDoc Title: {doc_title}\nparent text:\n{parent_text}\nPassage Text:\n{passage_text}\nScore: {score}\n"
            print(pretty_output)
            print("---")

## Full Document, nested passages
In this example we will split a document into passages, and store the full document as a parent document. We will then store the passages as nested documents, with a link back to the parent document.

Below we are using the parent child splitter to split the full documents into passages. The `parent_child_splitter` fn returns a list of documents, with an array of nested passages. 

We then index these documents into Elasticsearch. This will index the full document and the passages will be stored in a nested field. 

Our index pipeline processor will then run the inference on the passages, and store the embeddings in the index.

In [14]:
from elasticsearch import helpers

chunked_docs = parent_child_splitter(loader.load(), chunk_size=600)

count, errors = helpers.bulk(client, chunked_docs, index=INDEX_NAME)

print(f"Indexed {count} documents with {errors} errors")

import time

time.sleep(5)

Indexed 15 documents with [] errors


### Perform a Nested Search
We can now perform a nested search, to find the passages that match our query, which will be returned in `inner_hits`. In the example that follows only one passage per parent document is requested.

In [15]:
response = client.search(
    index=INDEX_NAME,
    knn={
        "inner_hits": {"size": 1, "_source": False, "fields": ["passages.text"]},
        "field": "passages.vector.predicted_value",
        "k": 4,
        "num_candidates": 100,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": "sentence-transformers__all-minilm-l6-v2",
                "model_text": "Whats the work from home policy?",
            }
        },
    },
)

pretty_response(response)


ID: 1XvjyowBidHK_OJxJozM
Doc Title: Work From Home Policy
parent text:

Passage Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities. It is designed to allow employees to work from home full time while maintaining the same level of performance and collaboration as they would in the office.
Eligibility


Score: 0.84830964

---

ID: 3HvjyowBidHK_OJxJozM
Doc Title: Intellectual Property Policy
parent text:

Passage Text:
Purpose
The purpose of this Intellectual Property Policy is to establish guidelines and procedures for the ownership, protection, and utilization of intellectual property generated by employees during their employment. This pol

### With Langchain
We can also peform this search within Langchain with an adjustment to the query.

We also override the `doc_builder` to populate the `site_content` with the passages rather than the full document.

In [18]:
from langchain_elasticsearch import (
    ElasticsearchStore,
    DenseVectorStrategy,
)
from typing import List, Union
from langchain_core.documents import Document


class CustomRetrievalStrategy(DenseVectorStrategy):
    def es_query(
        self,
        query: Union[str, None],
        filter: List[dict],
        **kwargs,
    ):
        return {
            "knn": {
                "inner_hits": {"_source": False, "fields": ["passages.text"]},
                "field": "passages.vector.predicted_value",
                "filter": filter,
                "k": 5,
                "num_candidates": 100,
                "query_vector_builder": {
                    "text_embedding": {
                        "model_id": "sentence-transformers__all-minilm-l6-v2",
                        "model_text": query,
                    }
                },
            }
        }


vector_store = ElasticsearchStore(
    index_name=INDEX_NAME,
    es_connection=client,
    query_field="content",
    strategy=CustomRetrievalStrategy(),
)


def doc_builder(hit):
    passage_hits = (
        hit.get("inner_hits", {}).get("passages", {}).get("hits", {}).get("hits", [])
    )
    page_content = ""
    for passage_hit in passage_hits:
        passage_fields = passage_hit.get("fields", {}).get("passages", [])[0]
        page_content += passage_fields.get("text", [])[0] + "\n\n"

        return Document(
            page_content=page_content,
            metadata=hit["_source"]["metadata"],
        )


results = vector_store.similarity_search(
    query="Whats the work from home policy?", doc_builder=doc_builder
)
for result in results:
    print(f'Doc title: {result.metadata["name"]}')
    print(f"Text:\n{result.page_content}")

Doc title: Work From Home Policy
Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities. It is designed to allow employees to work from home full time while maintaining the same level of performance and collaboration as they would in the office.
Eligibility


Doc title: Intellectual Property Policy
Text:
Purpose
The purpose of this Intellectual Property Policy is to establish guidelines and procedures for the ownership, protection, and utilization of intellectual property generated by employees during their employment. This policy aims to encourage creativity and innovation while ensuring that the interests of both the company and its employees