in src/frontends/streamlit/frontend/utils/stream_handler.py [0:0]
def process_events(self) -> None:
"""Process events from the stream, handling each event type appropriately."""
messages = self.st.session_state.user_chats[
self.st.session_state["session_id"]
]["messages"]
self.current_run_id = str(uuid.uuid4())
# Set run_id in session state at start of processing
self.st.session_state["run_id"] = self.current_run_id
stream = self.client.stream_messages(
data={
"input": {"messages": messages},
"config": {
"run_id": self.current_run_id,
"metadata": {
"user_id": self.st.session_state["user_id"],
"session_id": self.st.session_state["session_id"],
},
},
}
)
# Each event is a tuple message, metadata. https://langchain-ai.github.io/langgraph/how-tos/streaming/#messages
for message, _ in stream:
if isinstance(message, dict):
if message.get("type") == "constructor":
message = message["kwargs"]
# Handle tool calls
if message.get("tool_calls"):
tool_calls = message["tool_calls"]
ai_message = AIMessage(content="", tool_calls=tool_calls)
self.tool_calls.append(ai_message.model_dump())
for tool_call in tool_calls:
msg = f"\n\nCalling tool: `{tool_call['name']}` with args: `{tool_call['args']}`"
self.stream_handler.new_status(msg)
# Handle tool responses
elif message.get("tool_call_id"):
content = message["content"]
tool_call_id = message["tool_call_id"]
tool_message = ToolMessage(
content=content, type="tool", tool_call_id=tool_call_id
).model_dump()
self.tool_calls.append(tool_message)
msg = f"\n\nTool response: `{content}`"
self.stream_handler.new_status(msg)
# Handle AI responses
elif content := message.get("content"):
self.final_content += content
self.stream_handler.new_token(content)
# Handle end of stream
if self.final_content:
final_message = AIMessage(
content=self.final_content,
id=self.current_run_id,
additional_kwargs=self.additional_kwargs,
).model_dump()
session = self.st.session_state["session_id"]
self.st.session_state.user_chats[session]["messages"] = (
self.st.session_state.user_chats[session]["messages"] + self.tool_calls
)
self.st.session_state.user_chats[session]["messages"].append(final_message)
self.st.session_state.run_id = self.current_run_id