vectordb-genai-101/chat-app-code/backend/routers/search_router.py (85 lines of code) (raw):

import logging from fastapi import APIRouter, HTTPException from pydantic import BaseModel from backend.services import search_service, inference_service, llm_service from fastapi import WebSocket, APIRouter from elasticsearch import NotFoundError router = APIRouter() logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Set this to True to stream LLM responses back to the client # Streaming not implemented in this version streaming_llm = False class SearchQuery(BaseModel): query: str context_type: str @router.post("/search") async def perform_search(search_query: SearchQuery): logging.info(f"Received query: {search_query.query}") try: prompt_context = search_service.semantic_search(search_query.query, search_query.context_type) llm_response = bedrock_service.query_aws_bedrock(prompt_context) return {"prompt": prompt_context, "llm_response": llm_response} except Exception as e: logging.error(f"Error in processing search: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) class ChatMessage(BaseModel): message: str @router.websocket_route("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() # Initialize the conversation history convo_history = llm_service.init_conversation_history() try: while True: # Receive the message from the client (user's question) data = await websocket.receive_text() logging.debug(f"Raw data received: {data}") # Parse the user's question chat_message = ChatMessage.parse_raw(data) logging.info(f"Received message: {chat_message.message}") # create Prompt to generate retriever context_unparsed = search_service.perform_es_search( chat_message.message, "elastic-labs" ) logging.info(f"Context received from perform_es_search") # Create a prompt for the LLM prompt = llm_service.create_llm_prompt( chat_message.message, context_unparsed, convo_history ) logging.info(f"Created Prompt for LLM: {prompt}") logging.info(f"Prompt length: {len(prompt)}") #TODO this is a mess # Send the contextual data back to the UI before making LLM calls logging.debug(f"Sending context back to UI: {context_unparsed}") logging.info("building tmp_context") # logging.info(context_unparsed) # tmp_context = ('\n---------------------------------------------------------\n\n' # '---------------------------------------------------------\n\n\n').join(context_unparsed) tmp_context = "" for hit in context_unparsed: tmp_context += f"str{hit}\n\n" await websocket.send_json({ "type": "verbose_info", "text": f"Context gathered from Elasticsearch\n\n{tmp_context}" # f"Elasticsearch\n\n---------------------------------------------------------\n\n" # f"---------------------------------------------------------\n\n{tmp_context}" }) # Call the LLM to generate a response if not streaming_llm: logging.info(f"sending prompt {prompt}") # use Elastic to call chat completion - response is full response response = inference_service.es_chat_completion(prompt, "openai_chat_completions" ) logging.info(f"Response from LLM: {response}") logging.info(f"Sending response to client") await websocket.send_json({ "type": "full_response", "text": response }) # Add the user's question and the LLM response to the conversation history logging.info("Building conversation history") convo_history = llm_service.build_conversation_history(history=convo_history, user_message=chat_message.message, ai_response=response ) logging.debug(f"Conversation history: {convo_history}") tmp_convo_hist = '\n---------------------------------------------------------\n\n'.join( [str(h) for h in convo_history]) await websocket.send_json({ "type": "verbose_info", "text": f"Conversation history updated:\n\n{tmp_convo_hist}" }) except Exception as e: logging.error("WebSocket encountered an error:", exc_info=True) await websocket.close(code=1001)