llm_demo/orchestrator/langgraph/react_graph.py (161 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import uuid from typing import Annotated, Literal, Sequence, TypedDict from aiohttp import ClientSession from langchain_core.messages import ( AIMessage, BaseMessage, HumanMessage, ToolCall, ToolMessage, ) from langchain_core.prompts.chat import ChatPromptTemplate from langchain_core.runnables import RunnableConfig, RunnableLambda from langchain_google_vertexai import ChatVertexAI from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, StateGraph from langgraph.graph.message import add_messages from langgraph.managed import IsLastStep from .tool_node import ToolNode from .tools import ( TicketInfo, get_confirmation_needing_tools, insert_ticket, validate_ticket, ) class UserState(TypedDict): """ State with messages and ClientSession for each session/user. """ messages: Annotated[Sequence[BaseMessage], add_messages] user_id_token: str is_last_step: IsLastStep async def create_graph( tools, checkpointer: MemorySaver, prompt: ChatPromptTemplate, model_name: str, client: ClientSession, debug: bool, ): """ Creates a graph that works with a chat model that utilizes tool calling. Args: tools: A list of StructuredTools that will bind with the chat model. checkpointer: The checkpoint saver object. This is useful for persisting the state of the graph (e.g., as chat memory). prompt: Initial prompt for the model. This applies to messages before they are passed into the LLM. model_name: The chat model name. Returns: A compilled LangChain runnable that can be used for chat interactions. The resulting graph looks like this: [*] --> Start Start --> Agent Agent --> Tools : continue Tools --> Agent Agent --> End : end End --> [*] """ # tool node tool_node = ToolNode(tools) # model node # TODO: Use .bind_tools(tools) to bind the tools with the LLM. model = ChatVertexAI(max_output_tokens=512, model_name=model_name, temperature=0.0) # Add the prompt to the model to create a model runnable model_runnable = prompt | model async def acall_model(state: UserState, config: RunnableConfig): """ The node representing async function that calls the model. After invoking model, it will return AIMessage back to the user. """ messages = state["messages"] res = await model_runnable.ainvoke({"messages": messages}, config) # TODO: Remove the temporary fix of parsing LLM response and invoking # tools until we use bind_tools API and have automatic response parsing # and tool calling. (see # https://langchain-ai.github.io/langgraph/#example) if "```json" in res.content: try: response = str(res.content).replace("```json", "").replace("```", "") json_response = json.loads(response) action = json_response.get("action") action_input = json_response.get("action_input") if action == "Final Answer": res = AIMessage(content=action_input) else: res = AIMessage( content="suggesting a tool call", tool_calls=[ ToolCall( id=str(uuid.uuid4()), name=action, args=action_input ) ], ) except Exception as e: json_response = response res = AIMessage( content="Sorry, failed to generate the right format for response" ) # if model exceed the number of steps and has not yet return a final answer if state["is_last_step"] and hasattr(res, "tool_calls"): return { "messages": [ AIMessage( content="Sorry, need more steps to process this request.", ) ] } return {"messages": [res]} def agent_should_continue( state: UserState, ) -> Literal["booking_validation", "continue", "end"]: """ Function to determine which node is called after the agent node. """ messages = state["messages"] last_message = messages[-1] # If the LLM makes a tool call, then we route to the "tools" node if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0: confirmation_needing_tools = get_confirmation_needing_tools() for tool_call in last_message.tool_calls: tool_name = tool_call["name"] if tool_name in confirmation_needing_tools: if tool_name == "Insert Ticket": return "booking_validation" return "continue" # Otherwise, we stop (reply to the user) return "end" async def booking_validation_node(state: UserState, config: RunnableConfig): """ The node representing async function that validate the ticket. After ticket validation, it will return AIMessage with updated ticket args. """ messages = state["messages"] last_message = messages[-1] user_id_token = state["user_id_token"] if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0: tool_call = last_message.tool_calls[0] # Run ticket validation and return the correct ticket information flight_info = await validate_ticket( client, tool_call.get("args"), user_id_token ) new_message = AIMessage( content="Please confirm if you would like to book the ticket.", tool_calls=[ ToolCall( id=str(uuid.uuid4()), name=tool_call.get("name"), args=flight_info, ) ], additional_kwargs={"confirmation": True}, ) return {"messages": [new_message]} def booking_should_continue(state: UserState) -> Literal["continue", "agent"]: """ Function to determine which node is called after human response on ticket booking. """ messages = state["messages"] last_message = messages[-1] # If last message makes a tool call, then we route to the "tools" node to proceed with booking if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0: return "continue" # Otherwise, send response back to agent return "agent" async def insert_ticket_node(state: UserState, config: RunnableConfig): """ Node to update human response to prevent """ messages = state["messages"] last_message = messages[-1] user_id_token = state["user_id_token"] # Run insert ticket if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0: tool_call = last_message.tool_calls[0] tool_args = tool_call.get("args") ticket_info = TicketInfo(**tool_args) output = await insert_ticket(client, ticket_info, user_id_token) tool_call_id = tool_call.get("id") tool_message = ToolMessage( content=output, name="Insert Ticket", tool_call_id=tool_call_id ) human_message = HumanMessage(content="Looks good to me.") ai_message = AIMessage(content=output) return {"messages": [human_message, tool_message, ai_message]} # Define constant node strings AGENT_NODE = "agent" TOOL_NODE = "tools" BOOKING_VALIDATION_NODE = "booking_validation" INSERT_TICKET_NODE = "insert_ticket" # Define a new graph llm_graph = StateGraph(UserState) llm_graph.add_node(AGENT_NODE, RunnableLambda(acall_model)) llm_graph.add_node(TOOL_NODE, tool_node) llm_graph.add_node(BOOKING_VALIDATION_NODE, RunnableLambda(booking_validation_node)) llm_graph.add_node(INSERT_TICKET_NODE, RunnableLambda(insert_ticket_node)) # Set agent node as the first node to call llm_graph.set_entry_point(AGENT_NODE) # Add edges llm_graph.add_conditional_edges( AGENT_NODE, agent_should_continue, { "continue": TOOL_NODE, "booking_validation": BOOKING_VALIDATION_NODE, "end": END, }, ) llm_graph.add_edge(TOOL_NODE, AGENT_NODE) llm_graph.add_conditional_edges( BOOKING_VALIDATION_NODE, booking_should_continue, {"continue": INSERT_TICKET_NODE, "agent": AGENT_NODE}, ) llm_graph.add_edge(INSERT_TICKET_NODE, END) # Compile graph into a LangChain Runnable langgraph_app = llm_graph.compile( checkpointer=checkpointer, debug=debug, interrupt_after=["booking_validation"] ) return langgraph_app