example-apps/chatbot-rag-app/api/chat.py (72 lines of code) (raw):

import json import os from elasticsearch_client import ( elasticsearch_client, get_elasticsearch_chat_message_history, ) from flask import current_app, render_template, stream_with_context from functools import cache from langchain_elasticsearch import ( ElasticsearchStore, SparseVectorStrategy, ) from llm_integrations import get_llm INDEX = os.getenv("ES_INDEX", "workplace-app-docs") INDEX_CHAT_HISTORY = os.getenv( "ES_INDEX_CHAT_HISTORY", "workplace-app-docs-chat-history" ) ELSER_MODEL = os.getenv("ELSER_MODEL", ".elser_model_2") SESSION_ID_TAG = "[SESSION_ID]" SOURCE_TAG = "[SOURCE]" DONE_TAG = "[DONE]" store = ElasticsearchStore( es_connection=elasticsearch_client, index_name=INDEX, strategy=SparseVectorStrategy(model_id=ELSER_MODEL), ) @cache def get_lazy_llm(): return get_llm() @stream_with_context def ask_question(question, session_id): llm = get_lazy_llm() yield f"data: {SESSION_ID_TAG} {session_id}\n\n" current_app.logger.debug("Chat session ID: %s", session_id) chat_history = get_elasticsearch_chat_message_history( INDEX_CHAT_HISTORY, session_id ) if len(chat_history.messages) > 0: # create a condensed question condense_question_prompt = render_template( "condense_question_prompt.txt", question=question, chat_history=chat_history.messages, ) condensed_question = llm.invoke(condense_question_prompt).content else: condensed_question = question current_app.logger.debug("Condensed question: %s", condensed_question) current_app.logger.debug("Question: %s", question) docs = store.as_retriever().invoke(condensed_question) for doc in docs: doc_source = {**doc.metadata, "page_content": doc.page_content} current_app.logger.debug( "Retrieved document passage from: %s", doc.metadata["name"] ) yield f"data: {SOURCE_TAG} {json.dumps(doc_source)}\n\n" qa_prompt = render_template( "rag_prompt.txt", question=question, docs=docs, chat_history=chat_history.messages, ) answer = "" for chunk in llm.stream(qa_prompt): content = chunk.content.replace( "\n", " " ) # the stream can get messed up with newlines yield f"data: {content}\n\n" answer += chunk.content yield f"data: {DONE_TAG}\n\n" current_app.logger.debug("Answer: %s", answer) chat_history.add_user_message(question) chat_history.add_ai_message(answer)