# Chunk Large Documents via Ingest pipelines
<a target="_blank" href="https://colab.research.google.com/github/elastic/elasticsearch-labs/blob/main/notebooks/document-chunking/with-index-pipelines.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This interactive notebook will:
- load the model "sentence-transformers__all-minilm-l6-v2" from Hugging Face and into Elasticsearch ML Node
- create an index and ingest pipeline that will chunk large fields into smaller passages and vectorize them using the model
- perform a search and return docs with the most relevant passages

# Prefer the `semantic_text` field type

**Elasticsearch version 8.15 introduced the [`semantic_text`](https://www.elastic.co/guide/en/elasticsearch/reference/current/semantic-text.html) field type which handles the chunking process behind the scenes. Before continuing with this notebook, we highly recommend looking into this:**

- **<https://www.elastic.co/search-labs/blog/semantic-search-simplified-semantic-text>**
- **<https://github.com/elastic/elasticsearch-labs/blob/main/notebooks/search/09-semantic-text.ipynb>**

## Create Elastic Cloud deployment

If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?onboarding_token=vectorsearch&utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial.

Once logged in to your Elastic Cloud account, go to the [Create deployment](https://cloud.elastic.co/deployments/create) page and select **Create deployment**. Leave all settings with their default values.

## Install packages

To get started, we'll need to connect to our Elastic deployment using the Python client.
Because we're using an Elastic Cloud deployment, we'll use the **Cloud ID** to identify our deployment.

First we need to install the `elasticsearch` Python client.

In [None]:
!python3 -m pip install -qU "elasticsearch<9" "eland[pytorch]<9"

## Initialize the Elasticsearch client

Now we can instantiate the [Elasticsearch python client](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html), providing the cloud id and password in your deployment.

In [None]:
from elasticsearch import Elasticsearch
from getpass import getpass
import time

# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")

# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#creating-an-api-key
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

# Create the client instance
client = Elasticsearch(
    # For local development
    # hosts=["http://localhost:9200"]
    cloud_id=ELASTIC_CLOUD_ID,
    api_key=ELASTIC_API_KEY,
)

If you're running Elasticsearch locally or self-managed, you can pass in the Elasticsearch host instead. [Read more](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html#_verifying_https_with_certificate_fingerprints_python_3_10_or_later) on how to connect to Elasticsearch locally.

Confirm that the client has connected with this test.

In [None]:
print(client.info())

## Load Model from hugging face
The first thing you will need is a model to create the text embeddings out of the chunks, you can use whatever you would like, but this example will run end to end on the minilm-l6-v2 model. With an Elastic Cloud cluster created or another Elasticsearch cluster ready, we can upload the text embedding model using the eland library.

In [None]:
MODEL_ID = "sentence-transformers__all-minilm-l6-v2"

!eland_import_hub_model \
    --cloud-id $ELASTIC_CLOUD_ID \
    --es-username elastic \
    --es-api-key $ELASTIC_API_KEY \
    --hub-model-id "sentence-transformers/all-MiniLM-L6-v2" \
    --task-type text_embedding \
    --clear-previous \
    --start

while True:
    status = client.ml.get_trained_models_stats(model_id=MODEL_ID)
    if "trained_model_stats" in status.keys() and status["trained_model_stats"][0]["deployment_stats"]["state"] == "started":
        print(MODEL_ID + " Model has been successfully deployed & started.")
        break
    else:
        print(MODEL_ID + " Model is currently being deployed.")
    time.sleep(5)


## Chunk and Infer in pipeline
The next step is to define an ingest pipeline to break up the text field into chunks of text stored in the passages field. This pipeline has two processors, the first script processor breaks up the text field into an array of sentences stored in the passages field via a regular expression. For further research read up on regular expression advanced features such as negative lookbehind and positive lookbehind to understand how it tries to properly split on sentence boundaries, not split on Mr. or Mrs. or Ms., and keep the punctuation with the sentence. It also tries to concatenate the sentence chunks back together as long as the total string length is under the parameter passed to the script. The next for each processor runs the text embedding model on each sentence via an inferrence processor:

In [22]:
# Setup the pipeline
CHUNK_SIZE = 400

client.ingest.put_pipeline(
    id="chunk_text_to_passages",
    processors=[
        {
            "script": {
                "description": "Chunk body_content into sentences by looking for . followed by a space",
                "lang": "painless",
                "source": """
          String[] envSplit = /((?<!M(r|s|rs)\.)(?<=\.) |(?<=\!) |(?<=\?) )/.split(ctx['text']);
          ctx['passages'] = new ArrayList();
          int i = 0;
          boolean remaining = true;
          if (envSplit.length == 0) {
            return
          } else if (envSplit.length == 1) {
            Map passage = ['text': envSplit[0]];ctx['passages'].add(passage)
          } else {
            while (remaining) {
              Map passage = ['text': envSplit[i++]];
              while (i < envSplit.length && passage.text.length() + envSplit[i].length() < params.model_limit) {passage.text = passage.text + ' ' + envSplit[i++]}
              if (i == envSplit.length) {remaining = false}
              ctx['passages'].add(passage)
            }
          }
          """,
                "params": {"model_limit": CHUNK_SIZE},
            }
        },
        {
            "foreach": {
                "field": "passages",
                "processor": {
                    "inference": {
                        "field_map": {"_ingest._value.text": "text_field"},
                        "model_id": MODEL_ID,
                        "target_field": "_ingest._value.vector",
                        "on_failure": [
                            {
                                "append": {
                                    "field": "_source._ingest.inference_errors",
                                    "value": [
                                        {
                                            "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                                            "pipeline": "ml-inference-title-vector",
                                            "timestamp": "{{{ _ingest.timestamp }}}",
                                        }
                                    ],
                                }
                            }
                        ],
                    }
                },
            }
        },
    ],
)

ObjectApiResponse({'acknowledged': True})

## Setup Index
Next step is to prepare the mappings to handle the array of sentences and vector objects that will be created during the ingest pipeline. For this particular text embedding model the dimensions are 384 and dot_product similarity will be used for nearest neighbor calculations:

In [25]:
INDEX_NAME = "chunk_passages_example"

client.indices.delete(index=INDEX_NAME, ignore_unavailable=True)

# Setup the index
client.indices.create(
    index=INDEX_NAME,
    settings={"index": {"default_pipeline": "chunk_text_to_passages"}},
    mappings={
        "dynamic": "true",
        "properties": {
            "passages": {
                "type": "nested",
                "properties": {
                    "vector": {
                        "properties": {
                            "predicted_value": {
                                "type": "dense_vector",
                                "index": True,
                                "dims": 384,
                                "similarity": "dot_product",
                            }
                        }
                    }
                },
            }
        },
    },
)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'chunk_passages_example'})

### Add some Documents

Now we can add documents with large amounts of text in body_content and automatically have them chunked, and each chunk text embedded into vectors by the model:

In [26]:
import json
from urllib.request import urlopen
from elasticsearch import helpers

url = "https://raw.githubusercontent.com/elastic/elasticsearch-labs/main/datasets/workplace-documents.json"
docs = json.loads(urlopen(url).read())

operations = [
    {"_index": INDEX_NAME, "_id": i, "text": doc["content"], "name": doc["name"]}
    for i, doc in enumerate(docs)
]

# Add the documents to the index directly
response = helpers.bulk(
    client,
    operations,
    refresh=True,
)

## Aside: Pretty printing Elasticsearch responses

Your API calls will return hard-to-read nested JSON.
We'll create a little function called `pretty_response` to return nice, human-readable outputs from our examples.

In [27]:
def pretty_response(response):
    if len(response["hits"]["hits"]) == 0:
        print("Your search returned no results.")
    else:
        for hit in response["hits"]["hits"]:
            id = hit["_id"]
            score = hit["_score"]
            doc_title = hit["_source"]["name"]
            passage_text = ""

            for passage in hit["inner_hits"]["passages"]["hits"]["hits"]:
                passage_text += passage["fields"]["passages"][0]["text"][0] + "\n\n"

            pretty_output = f"\nID: {id}\nDoc Title: {doc_title}\nPassage Text:\n{passage_text}\nScore: {score}\n"
            print(pretty_output)
            print("---")

## Making queries

To search the data and return what chunk matched the query best you use inner_hits with the knn clause to return just that best matching chunk of the document in the hits output from the query.

Below you will see the response which returns the best document and the most relevant passage.


In [29]:
response = client.search(
    index=INDEX_NAME,
    knn={
        "inner_hits": {"size": 1, "_source": False, "fields": ["passages.text"]},
        "field": "passages.vector.predicted_value",
        "k": 3,
        "num_candidates": 100,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": MODEL_ID,
                "model_text": "Whats the work from home policy?",
            }
        },
    },
)

pretty_response(response)


ID: 0
Doc Title: Work From Home Policy
Passage Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities.


Score: 0.85496104

---

ID: 7
Doc Title: Intellectual Property Policy
Passage Text:
This policy aims to encourage creativity and innovation while ensuring that the interests of both the company and its employees are protected.

Scope
This policy applies to all employees, including full-time, part-time, temporary, and contract employees.

Definitions
a.


Score: 0.7664343

---

ID: 4
Doc Title: Company Vacation Policy
Passage Text:
Purpose

The purpose of this vacation policy is to outline the guidelines and procedures for requesting and tak