packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/agents/agent.py (548 lines of code) (raw):

import json import logging import os import re import time from abc import abstractmethod from typing import Any, AsyncIterator, Callable, Iterator, Optional, Sequence, Union from app.agents.agent_iterator import AgentExecutorIterator from app.agents.chain import Chain from app.agents.langchain import BedrockLLM from app.agents.parser import ReActSingleInputOutputParser from app.agents.prompts import AGENT_PROMPT_FOR_CLAUDE from app.agents.tools.base import BaseTool from app.agents.tools.common.exception import ExceptionTool from app.agents.tools.common.invalid import InvalidTool from app.agents.tools.knowledge import AnswerWithKnowledgeTool from app.config import DEFAULT_GENERATION_CONFIG as DEFAULT_CLAUDE_GENERATION_CONFIG from app.config import DEFAULT_MISTRAL_GENERATION_CONFIG from app.repositories.models.custom_bot import GenerationParamsModel from app.routes.schemas.conversation import type_model_name from langchain_core.agents import AgentAction, AgentFinish, AgentStep from langchain_core.callbacks import ( AsyncCallbackManagerForChainRun, CallbackManagerForChainRun, Callbacks, ) from langchain_core.exceptions import OutputParserException from langchain_core.prompts import PromptTemplate from langchain_core.pydantic_v1 import root_validator from langchain_core.runnables import ( Runnable, RunnableConfig, RunnablePassthrough, ensure_config, ) from langchain_core.runnables.utils import AddableDict from langchain_core.utils.input import get_color_mapping logger = logging.getLogger(__name__) ENABLE_MISTRAL = os.environ.get("ENABLE_MISTRAL", "") == "true" DEFAULT_GENERATION_CONFIG = ( DEFAULT_MISTRAL_GENERATION_CONFIG if ENABLE_MISTRAL else DEFAULT_CLAUDE_GENERATION_CONFIG ) # The maximum number of steps to take before ending the execution loop. MAX_ITERATIONS = 15 NextStepOutput = list[Union[AgentFinish, AgentAction, AgentStep]] def format_log_to_str( intermediate_steps: list[tuple[AgentAction, str]], observation_prefix: str = "<observation>", observation_suffix: str = "</observation>", llm_prefix: str = "", llm_suffix: str = "", ) -> str: """Construct the scratchpad that lets the agent continue its thought process.""" thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += ( f"\n{observation_prefix}{observation}{observation_suffix}\n{llm_prefix}" ) thoughts += llm_suffix return thoughts class BaseSingleActionAgent: """Base Single Action Agent class.""" @property def return_values(self) -> list[str]: """Return values of the agent.""" return ["output"] @abstractmethod def plan( self, intermediate_steps: list[tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ @abstractmethod async def aplan( self, intermediate_steps: list[tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Given input, decided what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ @property @abstractmethod def input_keys(self) -> list[str]: """Return the input keys. :meta private: """ def return_stopped_response( self, early_stopping_method: str, intermediate_steps: list[tuple[AgentAction, str]], **kwargs: Any, ) -> AgentFinish: """Return response when agent has been stopped due to max iterations.""" if early_stopping_method == "force": # `force` just returns a constant string return AgentFinish( {"output": "Agent stopped due to iteration limit or time limit."}, "", ) else: raise ValueError( f"Got unsupported early_stopping_method `{early_stopping_method}`" ) def tool_run_logging_kwargs(self) -> dict: return {} class RunnableAgent(BaseSingleActionAgent): """Agent powered by runnables.""" def __init__( self, runnable: Runnable[dict, Union[AgentAction, AgentFinish]], input_keys_arg: list[str] = [], return_keys_arg: list[str] = [], stream_runnable: bool = True, ) -> None: self.runnable = runnable self.input_keys_arg = input_keys_arg self.return_keys_arg = return_keys_arg self.stream_runnable = stream_runnable class Config: """Configuration for this pydantic object.""" arbitrary_types_allowed = True @property def return_values(self) -> list[str]: """Return values of the agent.""" return self.return_keys_arg @property def input_keys(self) -> list[str]: return self.input_keys_arg def plan( self, intermediate_steps: list[tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with the observations. callbacks: Callbacks to run. **kwargs: User inputs. Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) return final_output async def aplan( self, intermediate_steps: list[tuple[AgentAction, str]], callbacks: Callbacks = None, **kwargs: Any, ) -> Union[AgentAction, AgentFinish,]: """Based on past history and current inputs, decide what to do. Args: intermediate_steps: Steps the LLM has taken to date, along with observations callbacks: Callbacks to run. **kwargs: User inputs Returns: Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None if self.stream_runnable: # Use streaming to make sure that the underlying LLM is invoked in a # streaming # fashion to make it possible to get access to the individual LLM tokens # when using stream_log with the Agent Executor. # Because the response from the plan is not a generator, we need to # accumulate the output into final output and return that. async for chunk in self.runnable.astream( inputs, config={"callbacks": callbacks} ): if final_output is None: final_output = chunk else: final_output += chunk else: final_output = await self.runnable.ainvoke( inputs, config={"callbacks": callbacks} ) return final_output def create_react_agent( model: type_model_name, tools: list[BaseTool], generation_config: GenerationParamsModel | None = None, ) -> BaseSingleActionAgent: TOOLS_PROMPT = "\n".join( [ f"""<tool_name>{tool.name}</tool_name> <parameters> {"".join([f"<parameter><name>{param['name']}</name><type>{param['type']}</type><description>{param['description']}</description><is_required>{param['is_required']}</is_required></parameter>" for param in tool.extract_params_and_descriptions()])} </parameters> <tool_description>{tool.description}</tool_description> """ for tool in tools ] ) prompt = PromptTemplate.from_template(AGENT_PROMPT_FOR_CLAUDE) stop = ["<observation>"] generation_params = generation_config or GenerationParamsModel( max_tokens=DEFAULT_GENERATION_CONFIG["max_tokens"], top_k=DEFAULT_GENERATION_CONFIG["top_k"], top_p=DEFAULT_GENERATION_CONFIG["top_p"], temperature=DEFAULT_GENERATION_CONFIG["temperature"], stop_sequences=DEFAULT_GENERATION_CONFIG["stop_sequences"], ) # Overwrite the default generation config with the stop sequences generation_params.stop_sequences = stop llm = BedrockLLM.from_model(model=model, generation_params=generation_params) output_parser = ReActSingleInputOutputParser() prompt_partial = prompt.partial( tools=TOOLS_PROMPT, tool_names=", ".join([t.name for t in tools]) ) agent = ( RunnablePassthrough.assign( agent_scratchpad=lambda x: format_log_to_str(x["intermediate_steps"]), ) | prompt_partial | llm | output_parser ) return agent # type: ignore class AgentExecutor(Chain): """Agent that is using tools.""" agent: BaseSingleActionAgent """The agent to run for creating a plan and determining actions to take at each step of the execution loop.""" tools: Sequence[BaseTool] """The valid tools the agent can call.""" return_intermediate_steps: bool = False """Whether to return the agent's trajectory of intermediate steps at the end in addition to the final output.""" max_iterations: Optional[int] = MAX_ITERATIONS """The maximum number of steps to take before ending the execution loop. Setting to 'None' could lead to an infinite loop.""" max_execution_time: Optional[float] = None """The maximum amount of wall clock time to spend in the execution loop. """ early_stopping_method: str = "force" """The method to use for early stopping if the agent never returns `AgentFinish`. Either 'force' or 'generate'. `"force"` returns a string saying that it stopped because it met a time or iteration limit. `"generate"` calls the agent's LLM Chain one final time to generate a final answer based on the previous steps. """ handle_parsing_errors: Union[ bool, str, Callable[[OutputParserException], str] ] = False """How to handle errors raised by the agent's output parser. Defaults to `False`, which raises the error. If `true`, the error will be sent back to the LLM as an observation. If a string, the string itself will be sent to the LLM as an observation. If a callable function, the function will be called with the exception as an argument, and the result of that function will be passed to the agent as an observation. """ trim_intermediate_steps: Union[ int, Callable[[list[tuple[AgentAction, str]]], list[tuple[AgentAction, str]]], ] = -1 @root_validator(pre=True) def validate_runnable_agent(cls, values: dict) -> dict: """Convert runnable to agent if passed in.""" agent = values["agent"] if isinstance(agent, Runnable): try: output_type = agent.OutputType except Exception as _: multi_action = False else: multi_action = output_type == Union[list[AgentAction], AgentFinish] stream_runnable = values.pop("stream_runnable", True) if multi_action: raise NotImplementedError( "RunnableMultiActionAgent is not supported in AgentExecutor." ) else: values["agent"] = RunnableAgent( runnable=agent, stream_runnable=stream_runnable ) return values def iter( self, inputs: Any, callbacks: Callbacks = None, *, include_run_info: bool = False, ) -> AgentExecutorIterator: """Enables iteration over steps taken to reach final output.""" return AgentExecutorIterator( self, inputs, callbacks, tags=self.tags, include_run_info=include_run_info, ) @property def input_keys(self) -> list[str]: """Return the input keys. :meta private: """ return self.agent.input_keys @property def output_keys(self) -> list[str]: """Return the singular output key. :meta private: """ if self.return_intermediate_steps: return self.agent.return_values + ["intermediate_steps"] else: return self.agent.return_values def lookup_tool(self, name: str) -> BaseTool: """Lookup tool by name.""" return {tool.name: tool for tool in self.tools}[name] def _should_continue(self, iterations: int, time_elapsed: float) -> bool: if self.max_iterations is not None and iterations >= self.max_iterations: return False if ( self.max_execution_time is not None and time_elapsed >= self.max_execution_time ): return False return True def _return( self, output: AgentFinish, intermediate_steps: list, run_manager: Optional[CallbackManagerForChainRun] = None, ) -> dict[str, Any]: if run_manager: run_manager.on_agent_finish(output, color="green", verbose=self.verbose) final_output = output.return_values if self.return_intermediate_steps: final_output["intermediate_steps"] = intermediate_steps return final_output async def _areturn( self, output: AgentFinish, intermediate_steps: list, run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> dict[str, Any]: if run_manager: await run_manager.on_agent_finish( output, color="green", verbose=self.verbose ) final_output = output.return_values if self.return_intermediate_steps: final_output["intermediate_steps"] = intermediate_steps return final_output def _consume_next_step( self, values: NextStepOutput ) -> Union[AgentFinish, list[tuple[AgentAction, str]]]: if isinstance(values[-1], AgentFinish): assert len(values) == 1 return values[-1] else: return [ (a.action, a.observation) for a in values if isinstance(a, AgentStep) ] def _take_next_step( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], inputs: dict[str, str], intermediate_steps: list[tuple[AgentAction, str]], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Union[AgentFinish, list[tuple[AgentAction, str]]]: return self._consume_next_step( [ a for a in self._iter_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager, ) ] ) def _iter_next_step( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], inputs: dict[str, str], intermediate_steps: list[tuple[AgentAction, str]], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]: """Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """ try: intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) # Call the LLM to see what to do. output = self.agent.plan( intermediate_steps, callbacks=run_manager.get_child() if run_manager else None, **inputs, ) except OutputParserException as e: if isinstance(self.handle_parsing_errors, bool): raise_error = not self.handle_parsing_errors else: raise_error = False if raise_error: raise ValueError( "An output parsing error occurred. " "In order to pass this error back to the agent and have it try " "again, pass `handle_parsing_errors=True` to the AgentExecutor. " f"This is the error: {str(e)}" ) text = str(e) if isinstance(self.handle_parsing_errors, bool): if e.send_to_llm: observation = str(e.observation) text = str(e.llm_output) else: observation = "Invalid or incomplete response" elif isinstance(self.handle_parsing_errors, str): observation = self.handle_parsing_errors elif callable(self.handle_parsing_errors): observation = self.handle_parsing_errors(e) else: raise ValueError("Got unexpected type of `handle_parsing_errors`") output = AgentAction("_Exception", observation, text) if run_manager: run_manager.on_agent_action(output, color="green") tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = ExceptionTool().run( output.tool_input, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) yield AgentStep(action=output, observation=observation) return # If the tool chosen is the finishing tool, then we end and return. if isinstance(output, AgentFinish): yield output return actions: list[AgentAction] if isinstance(output, AgentAction): actions = [output] else: actions = output for agent_action in actions: yield agent_action for agent_action in actions: yield self._perform_agent_action( name_to_tool_map, color_mapping, agent_action, run_manager ) def _perform_agent_action( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], agent_action: AgentAction, run_manager: Optional[CallbackManagerForChainRun] = None, ) -> AgentStep: if run_manager: run_manager.on_agent_action(agent_action, color="green") # Otherwise we lookup the tool if agent_action.tool in name_to_tool_map: tool = name_to_tool_map[agent_action.tool] return_direct = tool.return_direct color = color_mapping[agent_action.tool] tool_run_kwargs = self.agent.tool_run_logging_kwargs() if return_direct: tool_run_kwargs["llm_prefix"] = "" # We then call the tool on the tool input to get an observation # The original langchain implementation cannot handle multiple inputs, so we need to convert the input to a dict tool_input = agent_action.tool_input logger.info(f"tool_input: {tool_input}") if type(agent_action.tool_input) == str: try: tool_input = json.loads(agent_action.tool_input) except json.JSONDecodeError: pass observation = tool.run( tool_input, verbose=self.verbose, color=color, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) if isinstance(tool, AnswerWithKnowledgeTool): # If the tool is AnswerWithKnowledgeTool, we need to extract the output observation = observation["output"] else: tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = InvalidTool().run( { "requested_tool_name": agent_action.tool, "available_tool_names": list(name_to_tool_map.keys()), }, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) return AgentStep(action=agent_action, observation=observation) async def _atake_next_step( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], inputs: dict[str, str], intermediate_steps: list[tuple[AgentAction, str]], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> Union[AgentFinish, list[tuple[AgentAction, str]]]: raise NotImplementedError() async def _aiter_next_step( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], inputs: dict[str, str], intermediate_steps: list[tuple[AgentAction, str]], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]: """Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """ raise NotImplementedError() async def _aperform_agent_action( self, name_to_tool_map: dict[str, BaseTool], color_mapping: dict[str, str], agent_action: AgentAction, run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> AgentStep: raise NotImplementedError() def _call( self, inputs: dict[str, str], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> dict[str, Any]: """Run text through and get agent response.""" # Construct a mapping of tool name to tool for easy lookup name_to_tool_map = {tool.name: tool for tool in self.tools} # We construct a mapping from each tool to a color, used for logging. color_mapping = get_color_mapping( [tool.name for tool in self.tools], excluded_colors=["green", "red"] ) intermediate_steps: list[tuple[AgentAction, str]] = [] # Let's start tracking the number of iterations and time elapsed iterations = 0 time_elapsed = 0.0 start_time = time.time() # We now enter the agent loop (until it returns something). while self._should_continue(iterations, time_elapsed): next_step_output = self._take_next_step( name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager=run_manager, ) if isinstance(next_step_output, AgentFinish): return self._return( next_step_output, intermediate_steps, run_manager=run_manager, ) intermediate_steps.extend(next_step_output) if len(next_step_output) == 1: next_step_action = next_step_output[0] # See if tool should return directly tool_return = self._get_tool_return(next_step_action) if tool_return is not None: return self._return( tool_return, intermediate_steps, run_manager=run_manager ) iterations += 1 time_elapsed = time.time() - start_time output = self.agent.return_stopped_response( self.early_stopping_method, intermediate_steps, **inputs ) return self._return(output, intermediate_steps, run_manager=run_manager) async def _acall( self, inputs: dict[str, str], run_manager: Optional[AsyncCallbackManagerForChainRun] = None, ) -> dict[str, str]: """Run text through and get agent response.""" raise NotImplementedError() def _get_tool_return( self, next_step_output: tuple[AgentAction, str] ) -> Optional[AgentFinish]: """Check if the tool is a returning tool.""" agent_action, observation = next_step_output name_to_tool_map = {tool.name: tool for tool in self.tools} return_value_key = "output" if len(self.agent.return_values) > 0: return_value_key = self.agent.return_values[0] # Invalid tools won't be in the map, so we return False. if agent_action.tool in name_to_tool_map: if name_to_tool_map[agent_action.tool].return_direct: return AgentFinish( {return_value_key: observation}, "", ) return None def _prepare_intermediate_steps( self, intermediate_steps: list[tuple[AgentAction, str]] ) -> list[tuple[AgentAction, str]]: if ( isinstance(self.trim_intermediate_steps, int) and self.trim_intermediate_steps > 0 ): return intermediate_steps[-self.trim_intermediate_steps :] elif callable(self.trim_intermediate_steps): return self.trim_intermediate_steps(intermediate_steps) else: return intermediate_steps def stream( self, input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Iterator[AddableDict]: """Enables streaming over steps taken to reach final output.""" config = ensure_config(config) iterator = AgentExecutorIterator( self, input, config.get("callbacks"), tags=config.get("tags"), metadata=config.get("metadata"), run_name=config.get("run_name"), run_id=config.get("run_id"), yield_actions=True, **kwargs, ) for step in iterator: yield step # async def astream( # self, # input: Union[dict[str, Any], Any], # config: Optional[RunnableConfig] = None, # **kwargs: Any, # ) -> AsyncIterator[AddableDict]: # """Enables streaming over steps taken to reach final output.""" # raise NotImplementedError()