example-apps/chatbot-rag-app/data/index_data.py (130 lines of code) (raw):
import json
import os
from sys import stdout
import time
from halo import Halo
from warnings import warn
from elasticsearch import (
ApiError,
Elasticsearch,
NotFoundError,
BadRequestError,
)
from elastic_transport._exceptions import ConnectionTimeout
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_elasticsearch import ElasticsearchStore
# Global variables
# Modify these if you want to use a different file, index or model
INDEX = os.getenv("ES_INDEX", "workplace-app-docs")
FILE = os.getenv("FILE", f"{os.path.dirname(__file__)}/data.json")
ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL")
ELASTICSEARCH_USER = os.getenv("ELASTICSEARCH_USER")
ELASTICSEARCH_PASSWORD = os.getenv("ELASTICSEARCH_PASSWORD")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")
ELSER_MODEL = os.getenv("ELSER_MODEL", ".elser_model_2")
if ELASTICSEARCH_USER:
es = Elasticsearch(
hosts=[ELASTICSEARCH_URL],
basic_auth=(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),
)
elif ELASTICSEARCH_API_KEY:
es = Elasticsearch(hosts=[ELASTICSEARCH_URL], api_key=ELASTICSEARCH_API_KEY)
else:
raise ValueError(
"Please provide either ELASTICSEARCH_USER or ELASTICSEARCH_API_KEY"
)
def install_elser():
# This script is re-entered on ctrl-c or someone just running it twice.
# Hence, both steps need to be careful about being potentially redundant.
# Step 1: Ensure ELSER_MODEL is defined
try:
es.ml.get_trained_models(model_id=ELSER_MODEL)
except NotFoundError:
print(f'"{ELSER_MODEL}" model not available, downloading it now')
es.ml.put_trained_model(
model_id=ELSER_MODEL, input={"field_names": ["text_field"]}
)
while True:
status = es.ml.get_trained_models(
model_id=ELSER_MODEL, include="definition_status"
)
if status["trained_model_configs"][0]["fully_defined"]:
break
time.sleep(1)
# Step 2: Ensure ELSER_MODEL is fully allocated
if not is_elser_fully_allocated():
try:
es.ml.start_trained_model_deployment(
model_id=ELSER_MODEL, wait_for="fully_allocated"
)
print(f'"{ELSER_MODEL}" model is deployed')
except BadRequestError:
# Already started, and likely fully allocated
pass
print(f'"{ELSER_MODEL}" model is ready')
def is_elser_fully_allocated():
stats = es.ml.get_trained_models_stats(model_id=ELSER_MODEL)
deployment_stats = stats["trained_model_stats"][0].get("deployment_stats", {})
allocation_status = deployment_stats.get("allocation_status", {})
return allocation_status.get("state") == "fully_allocated"
def main():
install_elser()
print(f"Loading data from ${FILE}")
metadata_keys = ["name", "summary", "url", "category", "updated_at"]
workplace_docs = []
with open(FILE, "rt") as f:
for doc in json.loads(f.read()):
workplace_docs.append(
Document(
page_content=doc["content"],
metadata={k: doc.get(k) for k in metadata_keys},
)
)
print(f"Loaded {len(workplace_docs)} documents")
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=512, chunk_overlap=256
)
docs = text_splitter.transform_documents(workplace_docs)
print(f"Split {len(workplace_docs)} documents into {len(docs)} chunks")
print(f"Creating Elasticsearch sparse vector store for {ELASTICSEARCH_URL}")
store = ElasticsearchStore(
es_connection=es,
index_name=INDEX,
strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(model_id=ELSER_MODEL),
)
# The first call creates ML tasks to support the index, and typically fails
# with the default 10-second timeout, at least when Elasticsearch is a
# container running on Apple Silicon.
#
# Once elastic/elasticsearch#107077 is fixed, we can use bulk_kwargs to
# adjust the timeout.
print(f"Adding documents to index {INDEX}")
if stdout.isatty():
spinner = Halo(text="Processing bulk operation", spinner="dots")
spinner.start()
try:
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))
except BadRequestError:
# This error means the index already exists
pass
except (ConnectionTimeout, ApiError) as e:
if isinstance(e, ApiError) and e.status_code != 408:
raise
warn(f"Error occurred, will retry after ML jobs complete: {e}")
await_ml_tasks()
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))
if stdout.isatty():
spinner.stop()
print(f"Documents added to index {INDEX}")
def await_ml_tasks(max_timeout=1200, interval=5):
"""
Waits for all machine learning tasks to complete within a specified timeout period.
Parameters:
max_timeout (int): Maximum time to wait for tasks to complete, in seconds.
interval (int): Time to wait between status checks, in seconds.
Raises:
TimeoutError: If the timeout is reached and machine learning tasks are still running.
"""
start_time = time.time()
ml_tasks = get_ml_tasks()
if not ml_tasks:
return # likely a lost race on tasks
print(f"Awaiting {len(ml_tasks)} ML tasks")
while time.time() - start_time < max_timeout:
ml_tasks = get_ml_tasks()
if not ml_tasks:
return
time.sleep(interval)
raise TimeoutError(
f"Timeout reached. ML tasks are still running: {', '.join(ml_tasks)}"
)
def get_ml_tasks():
"""Return a list of ML task actions from the ES tasks API."""
tasks = []
resp = es.tasks.list(detailed=True, actions=["cluster:monitor/xpack/ml/*"])
for node_info in resp["nodes"].values():
for task_info in node_info.get("tasks", {}).values():
tasks.append(task_info["action"])
return tasks
# Unless we run through flask, we can miss critical settings or telemetry signals.
if __name__ == "__main__":
raise RuntimeError("Run via the parent directory: 'flask create-index'")