in example-apps/chatbot-rag-app/data/index_data.py [0:0]
def main():
install_elser()
print(f"Loading data from ${FILE}")
metadata_keys = ["name", "summary", "url", "category", "updated_at"]
workplace_docs = []
with open(FILE, "rt") as f:
for doc in json.loads(f.read()):
workplace_docs.append(
Document(
page_content=doc["content"],
metadata={k: doc.get(k) for k in metadata_keys},
)
)
print(f"Loaded {len(workplace_docs)} documents")
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=512, chunk_overlap=256
)
docs = text_splitter.transform_documents(workplace_docs)
print(f"Split {len(workplace_docs)} documents into {len(docs)} chunks")
print(f"Creating Elasticsearch sparse vector store for {ELASTICSEARCH_URL}")
store = ElasticsearchStore(
es_connection=es,
index_name=INDEX,
strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(model_id=ELSER_MODEL),
)
# The first call creates ML tasks to support the index, and typically fails
# with the default 10-second timeout, at least when Elasticsearch is a
# container running on Apple Silicon.
#
# Once elastic/elasticsearch#107077 is fixed, we can use bulk_kwargs to
# adjust the timeout.
print(f"Adding documents to index {INDEX}")
if stdout.isatty():
spinner = Halo(text="Processing bulk operation", spinner="dots")
spinner.start()
try:
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))
except BadRequestError:
# This error means the index already exists
pass
except (ConnectionTimeout, ApiError) as e:
if isinstance(e, ApiError) and e.status_code != 408:
raise
warn(f"Error occurred, will retry after ML jobs complete: {e}")
await_ml_tasks()
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))
if stdout.isatty():
spinner.stop()
print(f"Documents added to index {INDEX}")