# Plan-and-Execute
---

### What is Plan-and-Execute?
The Plan-and-Execute framework is a strategy for retrieval-augmented generation (RAG) that divides complex reasoning tasks into two distinct phases: planning and execution. While traditional ReAct agents think one step at a time, plan-and-execute emphasizes explicit, long-term planning.

- **Planning Phase**: The model generates a high-level plan or structured outline that serves as a roadmap for solving the task. This phase ensures that the execution is systematic and adheres to the task's requirements.
- **Execution Phase**: Based on the generated plan, the model retrieves relevant information and executes the outlined steps to provide a detailed and coherent response.

This separation aims to address limitations in RAG systems that attempt to perform reasoning and generation in a single step, often leading to logical errors or inefficiency in handling complex tasks.

### Key Advantages
- **Improved Task Decomposition**: By explicitly separating planning and execution, the framework enables better handling of complex, multi-step reasoning tasks, ensuring systematic progress towards the solution.
- **Higher Accuracy and Coherence**: The planning phase acts as a guide, reducing the chances of errors and improving the logical coherence of the responses generated during execution.

**Reference**
- [ReAct paper](https://arxiv.org/abs/2210.03629)
- [Plan-and-Solve paper](https://arxiv.org/abs/2305.04091)
- [Baby-AGI project](https://github.com/yoheinakajima/babyagi)

In [None]:
import os
from dotenv import load_dotenv
from azure_genai_utils.tracer import get_langchain_api_key, set_langsmith

load_dotenv(override=True)

# If you want to trace your RAG API calls, please set the tracing=True. You need to have a valid Langchain API key.
langchain_key, has_langchain_key = get_langchain_api_key()
set_langsmith("[RAG Innv Lab] 1_Agentic-Design-Pattern", tracing=False)

azure_openai_chat_deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
azure_openai_embedding_deployment_name = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME")

In [None]:
# LANGUAGE = "English"
# LOCALE = "en-US"
LANGUAGE = "Korean"
LOCALE = "ko-KR"

language_prompt = f" Answer in {LANGUAGE}."

<br>

## ðŸ§ª Step 1. Test and Construct each module
---

Before building the entire the graph pipeline, we will test and construct each module separately.

### Web Search Tool

Web search tool is used to enhance the context.

In [None]:
from azure_genai_utils.tools import BingSearch

WEB_SEARCH_FORMAT_OUTPUT = False

web_search_tool = BingSearch(
    max_results=2,
    locale=LOCALE,
    include_news=False,
    include_entity=False,
    format_output=WEB_SEARCH_FORMAT_OUTPUT,
)
tools = [web_search_tool]

In [None]:
question = "Microsoft AutoGen"
results = web_search_tool.invoke({"query": question})
print(results[0])

### Define your LLM

This hands-on only uses the `gpt-4o`, but you can utilize multiple models in the pipeline.

In [None]:
from langchain_openai import AzureChatOpenAI

model_name = azure_openai_chat_deployment_name
llm = AzureChatOpenAI(model=model_name, temperature=0)

### Execution Agent


In [None]:
from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            f"You are a helpful assistant. {language_prompt}",
        ),
        ("human", "{messages}"),
    ]
)

# Create the ReAct agent
agent_executor = create_react_agent(llm, tools, prompt=prompt)

In [None]:
agent_executor.invoke({"messages": [("user", "Where is Seoul?")]})

### Plan Phase
Now let's consider how to create a **planning phase**, where we use function calling to create a plan.

In [None]:
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from typing import Annotated, List


class Plan(BaseModel):
    """Sorted steps to execute the plan"""

    steps: Annotated[List[str], "Different steps to follow, should be in sorted order"]


planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            f"""For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
{language_prompt}""",
        ),
        ("placeholder", "{messages}"),
    ]
)

planner = planner_prompt | llm.with_structured_output(Plan)

In [None]:
planner.invoke(
    {"messages": [(("user", "What is the benefit of using Microsoft AutoGen?"))]}
)

### Re-plan phase

In [None]:
from typing import Union


class Response(BaseModel):
    """Response to user."""

    response: str


class Act(BaseModel):
    """Action to perform."""

    # Use "Response" if you want to respond to user, use "Plan" if you need to further use tools to get the answer.
    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use 'Response'. "
        "If you need to further use tools to get the answer, use 'Plan'."
    )


replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.
"""
)

replanner = replanner_prompt | llm.with_structured_output(Act)

In [None]:
replanner.invoke(
    {
        "input": "What is the benifit of using Microsoft AutoGen?",
        "plan": "Step 1: Search for the benefits of Microsoft AutoGen.",
        "past_steps": "",
    }
)

<br>

## ðŸ§ª Step 2. Define the Graph
---

### State Definition

- `input`: User input
- `plan`: Current plan
- `past_steps`: past steps and results
- `response`: final response

In [None]:
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict


class PlanExecute(TypedDict):
    input: Annotated[str, "User's input"]
    plan: Annotated[List[str], "Current plan"]
    past_steps: Annotated[List[Tuple], operator.add]
    response: Annotated[str, "Final response"]

### Define Nodes

We will define the following nodes in the graph:

- `plan_step`: Plan the steps to execute the given task.
- `execute_step`: Execute the given task using Agent executor.
- `replan_step`: Replan from previous steps or get final response.
- `should_end`: Check if the agent should end execution or continue.
- `generate_final_report`: Generate a final report based on the execution results.

In [None]:
from langchain_core.output_parsers import StrOutputParser


def plan_step(state: PlanExecute):
    """Plan the steps to execute the given task."""
    plan = planner.invoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}


def execute_step(state: PlanExecute):
    """Execute the given task using Agent executor."""
    plan = state["plan"]
    # Each step is numbered
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing [step 1. {task}]."""
    agent_response = agent_executor.invoke({"messages": [("user", task_formatted)]})

    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],
    }


def replan_step(state: PlanExecute):
    """Replan from previous steps or get final response."""
    output = replanner.invoke(state)

    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        next_plan = output.action.steps
        if len(next_plan) == 0:
            return {"response": "No more steps needed."}
        else:
            return {"plan": next_plan}


def should_end(state: PlanExecute):
    """Check if the agent should end execution or continue."""
    if "response" in state and state["response"]:
        return "final_report"
    else:
        return "execute"


final_report_prompt = ChatPromptTemplate.from_template(
    """You are given the objective and the previously done steps. Your task is to generate a final report in markdown format.
Final report should be written in professional tone.

Your objective was this:

{input}

Your previously done steps(question and answer pairs):

{past_steps}
"""
)

final_report = final_report_prompt | llm | StrOutputParser()


def generate_final_report(state: PlanExecute):
    """Generate a final report based on the execution results."""
    past_steps = "\n\n".join(
        [
            f"Question: {past_step[0]}\n\nAnswer: {past_step[1]}\n\n####"
            for past_step in state["past_steps"]
        ]
    )
    response = final_report.invoke({"input": state["input"], "past_steps": past_steps})
    return {"response": response}

### Construct the Graph

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

workflow = StateGraph(PlanExecute)

# Node definitions
workflow.add_node("planner", plan_step)
workflow.add_node("execute", execute_step)
workflow.add_node("replan", replan_step)
workflow.add_node("final_report", generate_final_report)

# Edge connections
workflow.add_edge(START, "planner")
workflow.add_edge("planner", "execute")
workflow.add_edge("execute", "replan")
workflow.add_edge("final_report", END)
workflow.add_conditional_edges(
    "replan",
    should_end,
    {"execute": "execute", "final_report": "final_report"},
)

# Compile the workflow
app = workflow.compile(checkpointer=MemorySaver())

### Visualize the graph

In [None]:
from azure_genai_utils.graphs import visualize_langgraph

visualize_langgraph(app, xray=True)

<br>

## ðŸ§ª Step 3. Execute the Graph
---

### Execute the graph

In [None]:
from azure_genai_utils.messages import invoke_graph, random_uuid
from langchain_core.runnables import RunnableConfig

config = RunnableConfig(recursion_limit=50, configurable={"thread_id": random_uuid()})

inputs = {
    "input": f"Explain the main differences between Microsoft AutoGen and LangGraph. {language_prompt}"
}

invoke_graph(app, inputs, config)

In [None]:
from IPython.display import Markdown

snapshot = app.get_state(config).values
Markdown(snapshot["response"])