in packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/usecases/chat.py [0:0]
def chat(user_id: str, chat_input: ChatInput) -> ChatOutput:
user_msg_id, conversation, bot = prepare_conversation(user_id, chat_input)
message_map = conversation.message_map
search_results = []
if bot and is_running_on_lambda():
# NOTE: `is_running_on_lambda`is a workaround for local testing due to no postgres mock.
# Fetch most related documents from vector store
# NOTE: Currently embedding not support multi-modal. For now, use the last content.
query = conversation.message_map[user_msg_id].content[-1].body
search_results = search_related_docs(
bot_id=bot.id, limit=bot.search_params.max_results, query=query
)
logger.info(f"Search results from vector store: {search_results}")
# Insert contexts to instruction
conversation_with_context = insert_knowledge(
conversation, search_results, display_citation=bot.display_retrieved_chunks
)
message_map = conversation_with_context.message_map
messages = trace_to_root(
node_id=chat_input.message.parent_message_id, message_map=message_map
)
messages.append(chat_input.message) # type: ignore
# Create payload to invoke Bedrock
args = compose_args(
messages=messages,
model=chat_input.message.model,
instruction=(
message_map["instruction"].content[0].body
if "instruction" in message_map
else None
),
generation_params=(bot.generation_params if bot else None),
)
if is_anthropic_model(args["model"]):
client = get_anthropic_client()
response: AnthropicMessage = client.messages.create(**args)
reply_txt = response.content[0].text
else:
response = get_bedrock_response(args) # type: ignore
reply_txt = response["outputs"][0]["text"] # type: ignore
# Used chunks for RAG generation
used_chunks = None
if bot and bot.display_retrieved_chunks and is_running_on_lambda():
if len(search_results) > 0:
used_chunks = []
for r in filter_used_results(reply_txt, search_results):
content_type, source_link = get_source_link(r.source)
used_chunks.append(
ChunkModel(
content=r.content,
content_type=content_type,
source=source_link,
rank=r.rank,
)
)
# Issue id for new assistant message
assistant_msg_id = str(ULID())
# Append bedrock output to the existing conversation
message = MessageModel(
role="assistant",
content=[ContentModel(content_type="text", body=reply_txt, media_type=None)],
model=chat_input.message.model,
children=[],
parent=user_msg_id,
create_time=get_current_time(),
feedback=None,
used_chunks=used_chunks,
thinking_log=None,
)
conversation.message_map[assistant_msg_id] = message
# Append children to parent
conversation.message_map[user_msg_id].children.append(assistant_msg_id)
conversation.last_message_id = assistant_msg_id
if is_anthropic_model(args["model"]):
# Update total pricing
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
else:
metrics: InvocationMetrics = response["amazon-bedrock-invocationMetrics"] # type: ignore
input_tokens = metrics.input_tokens
output_tokens = metrics.output_tokens
price = calculate_price(chat_input.message.model, input_tokens, output_tokens)
conversation.total_price += price
# Store updated conversation
store_conversation(user_id, conversation)
# Update bot last used time
if chat_input.bot_id:
logger.info("Bot id is provided. Updating bot last used time.")
# Update bot last used time
modify_bot_last_used_time(user_id, chat_input.bot_id)
output = ChatOutput(
conversation_id=conversation.id,
create_time=conversation.create_time,
message=MessageOutput(
role=message.role,
content=[
Content(
content_type=c.content_type,
body=c.body,
media_type=c.media_type,
)
for c in message.content
],
model=message.model,
children=message.children,
parent=message.parent,
feedback=None,
used_chunks=(
[
Chunk(
content=c.content,
content_type=c.content_type,
source=c.source,
rank=c.rank,
)
for c in message.used_chunks
]
if message.used_chunks
else None
),
),
bot_id=conversation.bot_id,
)
return output