def main()

in example-apps/chatbot-rag-app/data/index_data.py [0:0]


def main():
    install_elser()

    print(f"Loading data from ${FILE}")

    metadata_keys = ["name", "summary", "url", "category", "updated_at"]
    workplace_docs = []
    with open(FILE, "rt") as f:
        for doc in json.loads(f.read()):
            workplace_docs.append(
                Document(
                    page_content=doc["content"],
                    metadata={k: doc.get(k) for k in metadata_keys},
                )
            )

    print(f"Loaded {len(workplace_docs)} documents")

    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=512, chunk_overlap=256
    )

    docs = text_splitter.transform_documents(workplace_docs)

    print(f"Split {len(workplace_docs)} documents into {len(docs)} chunks")

    print(f"Creating Elasticsearch sparse vector store for {ELASTICSEARCH_URL}")

    store = ElasticsearchStore(
        es_connection=es,
        index_name=INDEX,
        strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(model_id=ELSER_MODEL),
    )

    # The first call creates ML tasks to support the index, and typically fails
    # with the default 10-second timeout, at least when Elasticsearch is a
    # container running on Apple Silicon.
    #
    # Once elastic/elasticsearch#107077 is fixed, we can use bulk_kwargs to
    # adjust the timeout.

    print(f"Adding documents to index {INDEX}")

    if stdout.isatty():
        spinner = Halo(text="Processing bulk operation", spinner="dots")
        spinner.start()

    try:
        es.indices.delete(index=INDEX, ignore_unavailable=True)
        store.add_documents(list(docs))
    except BadRequestError:
        # This error means the index already exists
        pass
    except (ConnectionTimeout, ApiError) as e:
        if isinstance(e, ApiError) and e.status_code != 408:
            raise
        warn(f"Error occurred, will retry after ML jobs complete: {e}")
        await_ml_tasks()
        es.indices.delete(index=INDEX, ignore_unavailable=True)
        store.add_documents(list(docs))

    if stdout.isatty():
        spinner.stop()

    print(f"Documents added to index {INDEX}")