notebooks/document-chunking/with-index-pipelines.ipynb (556 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "87773ce7", "metadata": { "id": "87773ce7" }, "source": [ "# Chunk Large Documents via Ingest pipelines\n", "<a target=\"_blank\" href=\"https://colab.research.google.com/github/elastic/elasticsearch-labs/blob/main/notebooks/document-chunking/with-index-pipelines.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>\n", "\n", "This interactive notebook will:\n", "- load the model \"sentence-transformers__all-minilm-l6-v2\" from Hugging Face and into Elasticsearch ML Node\n", "- create an index and ingest pipeline that will chunk large fields into smaller passages and vectorize them using the model\n", "- perform a search and return docs with the most relevant passages\n", "\n", "# Prefer the `semantic_text` field type\n", "\n", "**Elasticsearch version 8.15 introduced the [`semantic_text`](https://www.elastic.co/guide/en/elasticsearch/reference/current/semantic-text.html) field type which handles the chunking process behind the scenes. Before continuing with this notebook, we highly recommend looking into this:**\n", "\n", "- **<https://www.elastic.co/search-labs/blog/semantic-search-simplified-semantic-text>**\n", "- **<https://github.com/elastic/elasticsearch-labs/blob/main/notebooks/search/09-semantic-text.ipynb>**" ] }, { "cell_type": "markdown", "id": "a32202e2", "metadata": { "id": "a32202e2" }, "source": [ "## Create Elastic Cloud deployment\n", "\n", "If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?onboarding_token=vectorsearch&utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial.\n", "\n", "Once logged in to your Elastic Cloud account, go to the [Create deployment](https://cloud.elastic.co/deployments/create) page and select **Create deployment**. Leave all settings with their default values." ] }, { "cell_type": "markdown", "id": "52a6a607", "metadata": { "id": "52a6a607" }, "source": [ "## Install packages\n", "\n", "To get started, we'll need to connect to our Elastic deployment using the Python client.\n", "Because we're using an Elastic Cloud deployment, we'll use the **Cloud ID** to identify our deployment.\n", "\n", "First we need to install the `elasticsearch` Python client." ] }, { "cell_type": "code", "execution_count": null, "id": "ffc5fa6f", "metadata": { "id": "ffc5fa6f" }, "outputs": [], "source": [ "!python3 -m pip install -qU \"elasticsearch<9\" \"eland[pytorch]<9\"" ] }, { "cell_type": "markdown", "id": "0241694c", "metadata": { "id": "0241694c" }, "source": [ "## Initialize the Elasticsearch client\n", "\n", "Now we can instantiate the [Elasticsearch python client](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html), providing the cloud id and password in your deployment." ] }, { "cell_type": "code", "execution_count": null, "id": "f38e0397", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "f38e0397", "outputId": "ad6df489-d242-4229-a42a-39c5ca19d124" }, "outputs": [], "source": [ "from elasticsearch import Elasticsearch\n", "from getpass import getpass\n", "import time\n", "\n", "# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id\n", "ELASTIC_CLOUD_ID = getpass(\"Elastic Cloud ID: \")\n", "\n", "# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#creating-an-api-key\n", "ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n", "\n", "# Create the client instance\n", "client = Elasticsearch(\n", " # For local development\n", " # hosts=[\"http://localhost:9200\"]\n", " cloud_id=ELASTIC_CLOUD_ID,\n", " api_key=ELASTIC_API_KEY,\n", ")" ] }, { "cell_type": "markdown", "id": "fcd165fa", "metadata": { "id": "fcd165fa" }, "source": [ "If you're running Elasticsearch locally or self-managed, you can pass in the Elasticsearch host instead. [Read more](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html#_verifying_https_with_certificate_fingerprints_python_3_10_or_later) on how to connect to Elasticsearch locally." ] }, { "cell_type": "markdown", "id": "1462ebd8", "metadata": { "id": "1462ebd8" }, "source": [ "Confirm that the client has connected with this test." ] }, { "cell_type": "code", "execution_count": null, "id": "25c618eb", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "25c618eb", "outputId": "30a6ba5b-5109-4457-ddfe-5633a077ca9b" }, "outputs": [], "source": [ "print(client.info())" ] }, { "cell_type": "markdown", "id": "4e272c75", "metadata": {}, "source": [ "## Load Model from hugging face\n", "The first thing you will need is a model to create the text embeddings out of the chunks, you can use whatever you would like, but this example will run end to end on the minilm-l6-v2 model. With an Elastic Cloud cluster created or another Elasticsearch cluster ready, we can upload the text embedding model using the eland library." ] }, { "cell_type": "code", "execution_count": null, "id": "63560817", "metadata": {}, "outputs": [], "source": [ "MODEL_ID = \"sentence-transformers__all-minilm-l6-v2\"\n", "\n", "!eland_import_hub_model \\\n", " --cloud-id $ELASTIC_CLOUD_ID \\\n", " --es-username elastic \\\n", " --es-api-key $ELASTIC_API_KEY \\\n", " --hub-model-id \"sentence-transformers/all-MiniLM-L6-v2\" \\\n", " --task-type text_embedding \\\n", " --clear-previous \\\n", " --start\n", "\n", "while True:\n", " status = client.ml.get_trained_models_stats(model_id=MODEL_ID)\n", " if \"trained_model_stats\" in status.keys() and status[\"trained_model_stats\"][0][\"deployment_stats\"][\"state\"] == \"started\":\n", " print(MODEL_ID + \" Model has been successfully deployed & started.\")\n", " break\n", " else:\n", " print(MODEL_ID + \" Model is currently being deployed.\")\n", " time.sleep(5)\n" ] }, { "cell_type": "markdown", "id": "61e1e6d8", "metadata": { "id": "61e1e6d8" }, "source": [ "## Chunk and Infer in pipeline\n", "The next step is to define an ingest pipeline to break up the text field into chunks of text stored in the passages field. This pipeline has two processors, the first script processor breaks up the text field into an array of sentences stored in the passages field via a regular expression. For further research read up on regular expression advanced features such as negative lookbehind and positive lookbehind to understand how it tries to properly split on sentence boundaries, not split on Mr. or Mrs. or Ms., and keep the punctuation with the sentence. It also tries to concatenate the sentence chunks back together as long as the total string length is under the parameter passed to the script. The next for each processor runs the text embedding model on each sentence via an inferrence processor:" ] }, { "cell_type": "code", "execution_count": 22, "id": "6bc95238", "metadata": { "id": "6bc95238" }, "outputs": [ { "data": { "text/plain": [ "ObjectApiResponse({'acknowledged': True})" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Setup the pipeline\n", "CHUNK_SIZE = 400\n", "\n", "client.ingest.put_pipeline(\n", " id=\"chunk_text_to_passages\",\n", " processors=[\n", " {\n", " \"script\": {\n", " \"description\": \"Chunk body_content into sentences by looking for . followed by a space\",\n", " \"lang\": \"painless\",\n", " \"source\": \"\"\"\n", " String[] envSplit = /((?<!M(r|s|rs)\\.)(?<=\\.) |(?<=\\!) |(?<=\\?) )/.split(ctx['text']);\n", " ctx['passages'] = new ArrayList();\n", " int i = 0;\n", " boolean remaining = true;\n", " if (envSplit.length == 0) {\n", " return\n", " } else if (envSplit.length == 1) {\n", " Map passage = ['text': envSplit[0]];ctx['passages'].add(passage)\n", " } else {\n", " while (remaining) {\n", " Map passage = ['text': envSplit[i++]];\n", " while (i < envSplit.length && passage.text.length() + envSplit[i].length() < params.model_limit) {passage.text = passage.text + ' ' + envSplit[i++]}\n", " if (i == envSplit.length) {remaining = false}\n", " ctx['passages'].add(passage)\n", " }\n", " }\n", " \"\"\",\n", " \"params\": {\"model_limit\": CHUNK_SIZE},\n", " }\n", " },\n", " {\n", " \"foreach\": {\n", " \"field\": \"passages\",\n", " \"processor\": {\n", " \"inference\": {\n", " \"field_map\": {\"_ingest._value.text\": \"text_field\"},\n", " \"model_id\": MODEL_ID,\n", " \"target_field\": \"_ingest._value.vector\",\n", " \"on_failure\": [\n", " {\n", " \"append\": {\n", " \"field\": \"_source._ingest.inference_errors\",\n", " \"value\": [\n", " {\n", " \"message\": \"Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'\",\n", " \"pipeline\": \"ml-inference-title-vector\",\n", " \"timestamp\": \"{{{ _ingest.timestamp }}}\",\n", " }\n", " ],\n", " }\n", " }\n", " ],\n", " }\n", " },\n", " }\n", " },\n", " ],\n", ")" ] }, { "cell_type": "markdown", "id": "74dde574", "metadata": {}, "source": [ "## Setup Index\n", "Next step is to prepare the mappings to handle the array of sentences and vector objects that will be created during the ingest pipeline. For this particular text embedding model the dimensions are 384 and dot_product similarity will be used for nearest neighbor calculations:" ] }, { "cell_type": "code", "execution_count": 25, "id": "_OAahfg-tqrf", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "_OAahfg-tqrf", "outputId": "d8f81ba4-cdc9-4e30-edf7-6d5bb16920eb" }, "outputs": [ { "data": { "text/plain": [ "ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'chunk_passages_example'})" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "INDEX_NAME = \"chunk_passages_example\"\n", "\n", "client.indices.delete(index=INDEX_NAME, ignore_unavailable=True)\n", "\n", "# Setup the index\n", "client.indices.create(\n", " index=INDEX_NAME,\n", " settings={\"index\": {\"default_pipeline\": \"chunk_text_to_passages\"}},\n", " mappings={\n", " \"dynamic\": \"true\",\n", " \"properties\": {\n", " \"passages\": {\n", " \"type\": \"nested\",\n", " \"properties\": {\n", " \"vector\": {\n", " \"properties\": {\n", " \"predicted_value\": {\n", " \"type\": \"dense_vector\",\n", " \"index\": True,\n", " \"dims\": 384,\n", " \"similarity\": \"dot_product\",\n", " }\n", " }\n", " }\n", " },\n", " }\n", " },\n", " },\n", ")" ] }, { "cell_type": "markdown", "id": "075f5eb6", "metadata": { "id": "075f5eb6" }, "source": [ "### Add some Documents\n", "\n", "Now we can add documents with large amounts of text in body_content and automatically have them chunked, and each chunk text embedded into vectors by the model:" ] }, { "cell_type": "code", "execution_count": 26, "id": "008d723e", "metadata": { "id": "008d723e" }, "outputs": [], "source": [ "import json\n", "from urllib.request import urlopen\n", "from elasticsearch import helpers\n", "\n", "url = \"https://raw.githubusercontent.com/elastic/elasticsearch-labs/main/datasets/workplace-documents.json\"\n", "docs = json.loads(urlopen(url).read())\n", "\n", "operations = [\n", " {\"_index\": INDEX_NAME, \"_id\": i, \"text\": doc[\"content\"], \"name\": doc[\"name\"]}\n", " for i, doc in enumerate(docs)\n", "]\n", "\n", "# Add the documents to the index directly\n", "response = helpers.bulk(\n", " client,\n", " operations,\n", " refresh=True,\n", ")" ] }, { "cell_type": "markdown", "id": "cd8b03e0", "metadata": { "id": "cd8b03e0" }, "source": [ "## Aside: Pretty printing Elasticsearch responses\n", "\n", "Your API calls will return hard-to-read nested JSON.\n", "We'll create a little function called `pretty_response` to return nice, human-readable outputs from our examples." ] }, { "cell_type": "code", "execution_count": 27, "id": "f12ce2c9", "metadata": { "id": "f12ce2c9" }, "outputs": [], "source": [ "def pretty_response(response):\n", " if len(response[\"hits\"][\"hits\"]) == 0:\n", " print(\"Your search returned no results.\")\n", " else:\n", " for hit in response[\"hits\"][\"hits\"]:\n", " id = hit[\"_id\"]\n", " score = hit[\"_score\"]\n", " doc_title = hit[\"_source\"][\"name\"]\n", " passage_text = \"\"\n", "\n", " for passage in hit[\"inner_hits\"][\"passages\"][\"hits\"][\"hits\"]:\n", " passage_text += passage[\"fields\"][\"passages\"][0][\"text\"][0] + \"\\n\\n\"\n", "\n", " pretty_output = f\"\\nID: {id}\\nDoc Title: {doc_title}\\nPassage Text:\\n{passage_text}\\nScore: {score}\\n\"\n", " print(pretty_output)\n", " print(\"---\")" ] }, { "cell_type": "markdown", "id": "39bdefe0", "metadata": { "id": "39bdefe0" }, "source": [ "## Making queries\n", "\n", "To search the data and return what chunk matched the query best you use inner_hits with the knn clause to return just that best matching chunk of the document in the hits output from the query.\n", "\n", "Below you will see the response which returns the best document and the most relevant passage.\n" ] }, { "cell_type": "code", "execution_count": 29, "id": "Df7hwcIjYwMT", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "Df7hwcIjYwMT", "outputId": "e63884d7-d4a5-4f5d-ea43-fc2f0793f040" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "ID: 0\n", "Doc Title: Work From Home Policy\n", "Passage Text:\n", "Effective: March 2020\n", "Purpose\n", "\n", "The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.\n", "Scope\n", "\n", "This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities.\n", "\n", "\n", "Score: 0.85496104\n", "\n", "---\n", "\n", "ID: 7\n", "Doc Title: Intellectual Property Policy\n", "Passage Text:\n", "This policy aims to encourage creativity and innovation while ensuring that the interests of both the company and its employees are protected.\n", "\n", "Scope\n", "This policy applies to all employees, including full-time, part-time, temporary, and contract employees.\n", "\n", "Definitions\n", "a.\n", "\n", "\n", "Score: 0.7664343\n", "\n", "---\n", "\n", "ID: 4\n", "Doc Title: Company Vacation Policy\n", "Passage Text:\n", "Purpose\n", "\n", "The purpose of this vacation policy is to outline the guidelines and procedures for requesting and taking time off from work for personal and leisure purposes.\n", "\n", "\n", "Score: 0.725452\n", "\n", "---\n" ] } ], "source": [ "response = client.search(\n", " index=INDEX_NAME,\n", " knn={\n", " \"inner_hits\": {\"size\": 1, \"_source\": False, \"fields\": [\"passages.text\"]},\n", " \"field\": \"passages.vector.predicted_value\",\n", " \"k\": 3,\n", " \"num_candidates\": 100,\n", " \"query_vector_builder\": {\n", " \"text_embedding\": {\n", " \"model_id\": MODEL_ID,\n", " \"model_text\": \"Whats the work from home policy?\",\n", " }\n", " },\n", " },\n", ")\n", "\n", "pretty_response(response)" ] }, { "cell_type": "code", "execution_count": null, "id": "c4bbcc4b-ea2d-47a3-b475-c2eb0eebb7e2", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3.12.3 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" }, "vscode": { "interpreter": { "hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e" } } }, "nbformat": 4, "nbformat_minor": 5 }