In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Task Planner Agent

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fagents%2Fgenai-experience-concierge%2Fagent-design-patterns%2Ftask-planner.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb">
      <img src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb">
      <img width="32px" src="https://www.svgrepo.com/download/217753/github.svg" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

<div style="clear: both;"></div>

<b>Share to:</b>

<a href="https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg" alt="LinkedIn logo">
</a>

<a href="https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg" alt="Bluesky logo">
</a>

<a href="https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg" alt="X logo">
</a>

<a href="https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb" target="_blank">
  <img width="20px" src="https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png" alt="Reddit logo">
</a>

<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agents/genai-experience-concierge/agent-design-patterns/task-planner.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg" alt="Facebook logo">
</a>

| | |
|-|-|
|Author(s) | [Pablo Gaeta](https://github.com/pablofgaeta) |

## Overview

### Introduction

This notebook demonstrates an implementation of a task planner agent (similar to ["Deep Research"](https://gemini.google/overview/deep-research)) This is a multi-agent architecture useful for tasks requiring more complex reasoning, planning, and multi-tool use.

This architecture is often much slower than single-agent designs because a single turn can consist of a large number of LLM calls and tool usage. This demo is particularly slow because the "Executor" agent only supports linear plans and executes each task in parallel. There is research on alternative approaches such as [LLM Compiler](https://arxiv.org/abs/2312.04511) that attempt to improve this design by constructing DAGs to enable parallel task execution.

The "Executor" agent in this demo is a Gemini model equipped with the Google Search Grounding Tool ([documentation](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/ground-with-google-search)) to enable live web search while executing tasks.

### Key Components

The task planner agent is built around several key components:

* **Language Model:** Gemini is used for natural language understanding, function calling, and response generation for multiple agents.
* **State Management:** LangGraph manages the conversation flow and maintains the session state, including conversation history and generated/executed plans.
* **Planner Node:** Generates a plan or a direct response to the user input. The plan consists of a sequence of tasks to be executed.
* **Executor Node:** Executes the tasks defined in the plan, typically using tools like Google Search to gather information.
* **Reflector Node:** Analyzes the results of the executed plan and the user's input to determine the next action - either generating a new plan for further execution or formulating a final response to the user.

### Workflow

The agent operates through the following workflow:

1. The **Planner** receives user input and either (1) responds directly to simple queries (e.g. "Hi") or (2) generates a research plan, including list of tasks to execute.
1. The **Executor** receives the plan and uses its tools to perform each task and update the plan with the executed task result.
1. The **Reflector** reviews the executed plan and either (1) generates a final response to the user or (2) generates a new plan and jumps back to step 2.

## Get Started

### Install dependencies

In [1]:
%pip install -q google-genai langgraph langgraph-checkpoint pydantic tenacity

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [None]:
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

### Authenticate your notebook environment (Colab only)

If you're running this notebook on Google Colab, run the cell below to authenticate your environment.

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

## Notebook parameters

In [2]:
# Use the environment variable if the user doesn't provide Project ID.
import os

PROJECT_ID = "[your-project-id]"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

REGION = "us-central1"  # @param {type:"string"}
PLANNER_MODEL_NAME = "gemini-2.0-flash-001"  # @param {type:"string"}
REFLECTOR_MODEL_NAME = "gemini-2.0-flash-001"  # @param {type:"string"}
EXECUTOR_MODEL_NAME = "gemini-2.0-flash-001"  # @param {type:"string"}

## Define the Task Planner Agent

### Import dependencies

In [3]:
from collections.abc import AsyncGenerator
import datetime
from typing import Literal, TypedDict
import uuid

from IPython import display as ipd
from google import genai
from google.genai import errors as genai_errors
from google.genai import types as genai_types
from langchain_core.runnables import config as lc_config
from langgraph import graph
from langgraph import types as lg_types
from langgraph.checkpoint import memory
from langgraph.config import get_stream_writer
import pydantic
import requests
from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential

### Define schemas

Defines all of the schemas, constants, and types required for building the agent.

In [4]:
# Agent config settings


class AgentConfig(pydantic.BaseModel):
    """Configuration settings for the agent, including project, region, and model details."""

    project: str
    """The Google Cloud project ID."""
    region: str
    """The Google Cloud region where the agent is deployed."""
    planner_model_name: str
    """The name of the Gemini model to use for planning."""
    executor_model_name: str
    """The name of the Gemini model to use for executing tasks."""
    reflector_model_name: str
    """The name of the Gemini model to use for reflecting on the plan and results."""


# Node names and literal types

REFLECTOR_NODE_NAME = "REFLECTOR"
"""The name of the reflector node in the LangGraph."""
ReflectorNodeTargetLiteral = Literal["REFLECTOR"]
"""Literal type for the reflector node target."""

EXECUTOR_NODE_NAME = "EXECUTOR"
"""The name of the executor node in the LangGraph."""
ExecutorNodeTargetLiteral = Literal["EXECUTOR"]
"""Literal type for the executor node target."""

PLANNER_NODE_NAME = "PLANNER"
"""The name of the planner node in the LangGraph."""
PlannerNodeTargetLiteral = Literal["PLANNER"]
"""Literal type for the planner node target."""

POST_PROCESS_NODE_NAME = "POST_PROCESS"
"""The name of the post-processing node in the LangGraph."""
PostProcessNodeTargetLiteral = Literal["POST_PROCESS"]
"""Literal type for the post-processing node target."""

EndNodeTargetLiteral = Literal["__end__"]
"""Literal type for the end node target."""

# langgraph models


class Task(pydantic.BaseModel):
    """An individual task with a goal and result."""

    goal: str = pydantic.Field(
        description="The description and goal of this step in the plan.",
    )
    """The description and goal of this step in the plan."""

    result: str | None = pydantic.Field(
        default=None,
        description="The result of this step determined by the plan executor. Always set this field to None",
    )
    """The result of this step determined by the plan executor. Always set this field to None."""


class Plan(pydantic.BaseModel):
    """A step-by-step sequential plan."""

    goal: str = pydantic.Field(description="High level goal for plan to help user.")
    """High level goal for plan to help user."""
    tasks: list[Task] = pydantic.Field(
        description="A list of individual tasks that will be executed in sequence before responding to the user. As the task gets more complex, you can add more steps.",
    )
    """A list of individual tasks that will be executed in sequence before responding to the user. As the task gets more complex, you can add more steps."""


class Response(pydantic.BaseModel):
    """Response to send to the user."""

    response: str
    """The response message to send to the user."""


class PlanOrRespond(pydantic.BaseModel):
    """Action to perform. Either respond to user or generate a research plan."""

    action: Response | Plan = pydantic.Field(
        description="The next action can either be a direct response to the user or generate a new plan if you need to think more and use tools."
    )
    """The next action can either be a direct response to the user or generate a new plan if you need to think more and use tools."""


# LangGraph models


class Turn(TypedDict, total=False):
    """
    Represents a single turn in a conversation.

    Attributes:
        id: Unique identifier for the turn.
        created_at: Timestamp of when the turn was created.
        user_input: The user's input in this turn.
        response: The agent's response in this turn, if any.
        plan: The agent's last generated plan for this turn, if any.
        messages: A list of Gemini content messages associated with this turn.
    """

    id: uuid.UUID
    """Unique identifier for the turn."""

    created_at: datetime.datetime
    """Timestamp of when the turn was created."""

    user_input: str
    """The user's input for this turn."""

    response: str
    """The agent's response for this turn, if any."""

    plan: Plan | None
    """The agent's last generated plan for this turn, if any."""

    messages: list[genai_types.Content]
    """List of Gemini Content objects representing the conversation messages in this turn."""


class GraphSession(TypedDict, total=False):
    """
    Represents the complete state of a conversation session.

    Attributes:
        id: Unique identifier for the session.
        created_at: Timestamp of when the session was created.
        current_turn: The current turn in the session, if any.
        turns: A list of all turns in the session.
    """

    id: uuid.UUID
    """Unique identifier for the session."""

    created_at: datetime.datetime
    """Timestamp of when the session was created."""

    current_turn: Turn | None
    """The current conversation turn."""

    turns: list[Turn]
    """List of all conversation turns in the session."""

### Utility Functions

In [5]:
def is_retryable_error(exception: BaseException) -> bool:
    """
    Determines if a given exception is considered retryable.

    This function checks if the provided exception is an API error with a retryable HTTP status code
    (429, 502, 503, 504) or a connection error.

    Args:
        exception: The exception to evaluate.

    Returns:
        True if the exception is retryable, False otherwise.
    """

    if isinstance(exception, genai_errors.APIError):
        return exception.code in [429, 502, 503, 504]
    if isinstance(exception, requests.exceptions.ConnectionError):
        return True
    return False

In [6]:
def stringify_task(task: Task, include_results: bool = True) -> str:
    """
    Formats a task into a human-readable string.

    This function takes a task and converts it into a formatted string,
    including the task goal and optionally the task result.

    Args:
        task (Task): The task.
        include_results (bool, optional): Whether to include the task result in the output. Defaults to True.

    Returns:
        str: The formatted task string.
    """
    output = f"**Goal**: {task.goal}"

    if include_results:
        output += f"\n\n**Result**: {task.result or 'incomplete'}"

    return output

In [7]:
def stringify_plan(plan: Plan, include_results: bool = True) -> str:
    """
    Formats an execution plan into a human-readable string.

    This function takes an execution plan and converts it into a formatted string,
    including the goal and a list of tasks.

    Args:
        plan (Plan): The execution plan.
        include_results (bool, optional): Whether to include task results in the output. Defaults to True.

    Returns:
        str: The formatted execution plan string.
    """
    tasks_str = "\n\n".join(
        f"**Task #{idx + 1}**\n\n"
        + stringify_task(task, include_results=include_results)
        for idx, task in enumerate(plan.tasks)
    )

    response = f"**Plan**: {plan.goal}\n\n{tasks_str}"

    return response

### Core Agent Operations

#### Plan Generator

Generates a plan or a direct response based on the current turn and conversation history.

In [8]:
@retry(
    retry=retry_if_exception(is_retryable_error),
    wait=wait_exponential(min=1, max=10),
    stop=stop_after_attempt(3),
    reraise=True,
)
async def generate_plan(
    current_turn: Turn,
    project: str,
    region: str,
    model_name: str,
    history: list[Turn] | None = None,
) -> PlanOrRespond:
    """
    Generates a plan or a direct response based on the current turn and conversation history.

    This function uses a Gemini model to analyze the user's input and the conversation history
    to determine whether to generate a step-by-step plan for further action or to provide a
    direct response to the user.

    Args:
        current_turn: The current turn in the conversation, containing the user's input.
        project: The Google Cloud project ID.
        region: The Google Cloud region.
        model_name: The name of the Gemini model to use.
        history: A list of previous turns in the conversation (optional).

    Returns:
        A PlanOrRespond object, which can either contain a Response object (to respond to the user)
        or a Plan object (to generate a new plan).
    """

    history = history or []

    client = genai.Client(vertexai=True, project=project, location=region)

    contents = [
        genai_types.Content(role=role, parts=[genai_types.Part.from_text(text=text)])
        for turn in history + [current_turn]
        for role, text in (
            ("user", turn.get("user_input")),
            ("model", turn.get("response") or "EMPTY"),
        )
    ]

    content_response = await client.aio.models.generate_content(
        model=model_name,
        contents=contents,
        config=genai_types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=PlanOrRespond,
            system_instruction="""
# Mission
For the given user input, come up with a response to the user or a simple step by step plan.

## Choices
If you can provide a direct response without executing any sub-tasks, provide a response action.
If you need clarification or have follow up questions, provide a response action.
If the user input requires research to answer or looking up realtime data, provide a plan action.

## Instructions for plans
The plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
None of the steps are allowed to be user-facing, they must all be executed by the research agent with no input from the user.
A different responder agent will generate a final response to the user after the researcher executes the plan tasks.
Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.
""".strip(),
        ),
    )

    plan_reflection = PlanOrRespond.model_validate_json(content_response.text)

    return plan_reflection

#### Plan Executor

Executes a given plan step-by-step and yields the results of each task.

In [9]:
@retry(
    retry=retry_if_exception(is_retryable_error),
    wait=wait_exponential(min=1, max=10),
    stop=stop_after_attempt(3),
    reraise=True,
)
async def execute_plan(
    plan: Plan,
    project: str,
    region: str,
    model_name: str,
) -> AsyncGenerator[tuple[int, Task], None]:
    """
    Executes a given plan step-by-step and yields the results of each task.

    This function iterates through the tasks in a given plan, executes each task using a Gemini model
    with Google Search tool enabled, and yields the index and updated task with the result.

    Args:
        plan: The plan to execute, containing a list of tasks.
        project: The Google Cloud project ID.
        region: The Google Cloud region.
        model_name: The name of the Gemini model to use.

    Yields:
        An asynchronous generator that yields tuples of (index, task), where index is the task's
        position in the plan and task is the updated task with the execution result.
    """

    executed_plan = plan.model_copy(deep=True)

    client = genai.Client(vertexai=True, project=project, location=region)

    search_tool = genai_types.Tool(google_search=genai_types.GoogleSearch())
    system_instruction = "Your mission is to execute the research goal provided and respond with findings. The result is not provided directly to the user, but instead provided to another agent to summarize findings."

    for idx, task in enumerate(executed_plan.tasks):
        if task.result is not None:
            continue

        # last task will be missing result. Will fill in from agent response.
        all_tasks = executed_plan.tasks[: idx + 1]
        all_tasks_string = "\n---\n".join(
            f"Goal: {task.goal}\n\nResult: {task.result or ''}" for task in all_tasks
        )

        contents = f"# Plan\nHigh Level Goal: {plan.goal}\n---\n{all_tasks_string}"

        content_response = await client.aio.models.generate_content(
            model=model_name,
            contents=contents,
            config=genai_types.GenerateContentConfig(
                tools=[search_tool],
                system_instruction=system_instruction,
            ),
        )

        task.result = content_response.text

        yield idx, task

#### Plan Reflector

Reflects on a user's input and an executed plan to determine the next action (response or new plan).

In [10]:
@retry(
    retry=retry_if_exception(is_retryable_error),
    wait=wait_exponential(min=1, max=10),
    stop=stop_after_attempt(3),
    reraise=True,
)
async def reflect_plan(
    user_input: str,
    executed_plan: Plan,
    project: str,
    region: str,
    model_name: str,
) -> PlanOrRespond:
    """
    Reflects on a user's input and an executed plan to determine the next action (response or new plan).

    This function uses a Gemini model to analyze the user's last message, the overall goal of the
    research agent, and the steps that were executed in the previous plan. Based on this analysis,
    it decides whether to generate a direct response to the user or to create a new plan for
    further action.

    Args:
        user_input: The user's most recent input.
        executed_plan: The plan that was previously executed.
        project: The Google Cloud project ID.
        region: The Google Cloud region.
        model_name: The name of the Gemini model to use.

    Returns:
        A PlanOrRespond object, which can either contain a Response object (to respond to the user)
        or a Plan object (to generate a new plan).
    """

    client = genai.Client(vertexai=True, project=project, location=region)

    system_instructions = """
# Mission
For the given user input, come up with a response to the user or a simple step by step plan.

## Choices
If you can provide a direct response without executing any sub-tasks, provide a response action.
If you need clarification or have follow up questions, provide a response action.
If the user input requires multiple steps to answer or looking up realtime data, provide a plan action.

## Instructions for plans
The plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.
""".strip()

    contents = f"""
The last user message was:
{user_input}

The main goal of the research agent was:
{executed_plan.goal}

The research agent executed the following tasks:
{executed_plan.tasks}
""".strip()

    content_response = await client.aio.models.generate_content(
        model=model_name,
        contents=contents,
        config=genai_types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=PlanOrRespond,
            system_instruction=system_instructions,
        ),
    )

    plan_reflection = PlanOrRespond.model_validate_json(content_response.text)

    return plan_reflection

#### Test all core functions

In [11]:
print("-" * 10, "Reading user input", "-" * 10, end="\n\n")

example_user_input = "research best video games for my nerdy 10yo son"
print(example_user_input, end="\n\n")

print("-" * 10, "Generating plan", "-" * 10, end="\n\n")

example_generated_plan_or_respond = await generate_plan(
    Turn(user_input=example_user_input),
    project=PROJECT,
    region=REGION,
    model_name=PLANNER_MODEL_NAME,
)
example_generated_plan = example_generated_plan_or_respond.action
assert isinstance(example_generated_plan, Plan), "Expected action to be plan"
display(ipd.Markdown(stringify_plan(example_generated_plan)))

async for idx, new_task in execute_plan(
    plan=example_generated_plan,
    project=PROJECT,
    region=REGION,
    model_name=EXECUTOR_MODEL_NAME,
):
    example_generated_plan.tasks[idx] = new_task

    print("-" * 10, "Executed task", "-" * 10, end="\n\n")

    display(ipd.Markdown(f"**Goal**: {new_task.goal}\n\n**Result**: {new_task.result}"))

print("-" * 10, "Reflection on plan", "-" * 10, end="\n\n")

example_reflection_plan_or_respond = await reflect_plan(
    user_input=example_user_input,
    executed_plan=example_generated_plan,
    project=PROJECT,
    region=REGION,
    model_name=REFLECTOR_MODEL_NAME,
)
example_reflection_response = example_reflection_plan_or_respond.action
assert isinstance(
    example_reflection_response, Response
), "Expected action to be response"
display(ipd.Markdown(example_reflection_response.response))

### Nodes

#### Planner Node

Generates a plan or a direct response based on the current turn and conversation history.

In [12]:
async def planner_node(
    state: GraphSession,
    config: lc_config.RunnableConfig,
) -> lg_types.Command[Literal[ExecutorNodeTargetLiteral, PostProcessNodeTargetLiteral]]:
    """
    Asynchronously generates a plan or a direct response based on the current conversation state.

    This function takes the current conversation state, which includes the user's input and history,
    and uses the `generate_plan` function to determine whether to create a plan for further action
    or to provide a direct response. It then updates the conversation state and directs the flow
    to the appropriate next node (executor or post-processing).

    Args:
        state: The current state of the conversation session, including user input and history.
        config: The LangChain RunnableConfig containing agent-specific configurations.

    Returns:
        A Command object that specifies the next node to transition to (executor or post-processing)
        and the updated conversation state. The state includes the generated plan or response.

    Raises:
        TypeError: If the plan reflection action is of an unsupported type.
    """

    agent_config = AgentConfig.model_validate(
        config["configurable"].get("agent_config", {})
    )

    stream_writer = get_stream_writer()

    current_turn = state.get("current_turn")
    assert current_turn is not None, "current turn must be set"

    user_input = current_turn.get("user_input")
    assert user_input is not None, "user input must be set"

    turns = state.get("turns", [])

    plan_reflection = await generate_plan(
        current_turn=current_turn,
        project=agent_config.project,
        region=agent_config.region,
        model_name=agent_config.planner_model_name,
        history=turns,
    )

    next_node = None
    if isinstance(plan_reflection.action, Plan):
        next_node = EXECUTOR_NODE_NAME

        # Ensure results aren't set
        for task in plan_reflection.action.tasks:
            task.result = None

        # Set initial plan
        current_turn["plan"] = plan_reflection.action
        stream_writer({"plan": plan_reflection.action.model_dump(mode="json")})

    elif isinstance(plan_reflection.action, Response):
        next_node = POST_PROCESS_NODE_NAME

        # Update turn response
        current_turn["response"] = plan_reflection.action.response
        stream_writer({"response": plan_reflection.action.response})

    else:
        raise TypeError(
            "Unsupported plan reflection action: %s", type(plan_reflection.action)
        )

    return lg_types.Command(
        update=GraphSession(current_turn=current_turn),
        goto=next_node,
    )

#### Executor Node

Executes a given plan step-by-step and yields the results of each task.

In [13]:
async def executor_node(
    state: GraphSession,
    config: lc_config.RunnableConfig,
) -> lg_types.Command[Literal[ReflectorNodeTargetLiteral]]:
    """
    Asynchronously executes a plan's tasks and updates the conversation state.

    This function takes the current conversation state, which includes a plan, and executes each task within that plan.
    It utilizes the `execute_plan` function to process each task, updating the plan with the results as it goes.
    The function also streams the executed tasks to the user via the stream writer.

    Args:
        state: The current state of the conversation session, including the plan to execute.
        config: The LangChain RunnableConfig containing agent-specific configurations.

    Returns:
        A Command object that specifies the next node to transition to (reflector) and the
        updated conversation state. The state includes the plan with executed tasks.

    Raises:
        AssertionError: If the plan is not generated before execution.
    """

    agent_config = AgentConfig.model_validate(
        config["configurable"].get("agent_config", {})
    )

    stream_writer = get_stream_writer()

    current_turn = state.get("current_turn")
    assert current_turn is not None, "current turn must be set"

    plan = current_turn.get("plan")
    assert plan is not None, "plan must be set"

    async for idx, executed_task in execute_plan(
        plan=plan,
        project=agent_config.project,
        region=agent_config.region,
        model_name=agent_config.executor_model_name,
    ):
        # update state with executed task
        plan.tasks[idx] = executed_task

        stream_writer({"executed_task": executed_task.model_dump(mode="json")})

    current_turn["plan"] = plan

    return lg_types.Command(
        update=GraphSession(current_turn=current_turn),
        goto=REFLECTOR_NODE_NAME,
    )

#### Reflector Node

Reflects on a user's input and an executed plan to determine the next action (response or new plan).

In [14]:
async def reflector_node(
    state: GraphSession,
    config: lc_config.RunnableConfig,
) -> lg_types.Command[Literal[ExecutorNodeTargetLiteral, PlannerNodeTargetLiteral]]:
    """
    Asynchronously reflects on the executed plan and determines the next action.

    This function takes the current conversation state, which includes the executed plan, and uses
    the `reflect_plan` function to analyze the results and decide whether to generate a new plan
    or provide a direct response. It then updates the conversation state and directs the flow
    to the appropriate next node (executor or planner).

    Args:
        state: The current state of the conversation session, including the executed plan.
        config: The LangChain RunnableConfig containing agent-specific configurations.

    Returns:
        A Command object that specifies the next node to transition to (executor or planner) and the
        updated conversation state. The state includes the updated plan or response.

    Raises:
        AssertionError: If the plan is not generated or not fully executed before reflection.
        TypeError: If the plan reflection action is of an unsupported type.
    """

    agent_config = AgentConfig.model_validate(
        config["configurable"].get("agent_config", {})
    )

    stream_writer = get_stream_writer()

    current_turn = state.get("current_turn")
    assert current_turn is not None, "current turn must be set"

    user_input = current_turn.get("user_input")
    assert user_input is not None, "user input must be set"

    plan = current_turn.get("plan")
    assert plan is not None, "plan must be set"

    assert all(
        task.result is not None for task in plan.tasks
    ), "Must execute each plan task before reflection."

    plan_reflection = await reflect_plan(
        user_input=user_input,
        executed_plan=plan,
        project=agent_config.project,
        region=agent_config.region,
        model_name=agent_config.reflector_model_name,
    )

    next_node = None
    if isinstance(plan_reflection.action, Plan):
        next_node = EXECUTOR_NODE_NAME

        # Ensure results aren't set
        for task in plan_reflection.action.tasks:
            task.result = None

        # Add new tasks from plan reflection
        current_turn["plan"].tasks += plan_reflection.action.tasks

        stream_writer({"plan": current_turn["plan"].model_dump(mode="json")})

    elif isinstance(plan_reflection.action, Response):
        next_node = POST_PROCESS_NODE_NAME

        # Update turn response
        current_turn["response"] = plan_reflection.action.response

        stream_writer({"response": current_turn["response"]})
    else:  # never
        raise TypeError(
            "Unsupported plan reflection action: %s", type(plan_reflection.action)
        )

    return lg_types.Command(
        update=GraphSession(current_turn=current_turn),
        goto=next_node,
    )

#### Post-Process Node

Add current turn to the history and reset current turn.

In [15]:
def post_process_node(
    state: GraphSession,
    config: lc_config.RunnableConfig,
) -> lg_types.Command[EndNodeTargetLiteral]:
    """
    Asynchronously invokes the post-processing node to finalize the current conversation turn.

    This function takes the current conversation state, validates that the current turn and its response are set,
    adds the completed turn to the conversation history, and resets the current turn. This effectively concludes
    the processing of the current user input and prepares the session for the next input.

    Args:
        state: The current state of the conversation session.
        config: The LangChain RunnableConfig (unused in this function).

    Returns:
        A Command object specifying the end of the graph execution and the updated conversation state.
    """

    del config  # unused

    current_turn = state.get("current_turn")

    assert current_turn is not None, "Current turn must be set."
    assert (
        current_turn["response"] is not None
    ), "Response from current turn must be set."

    turns = state.get("turns", []) + [current_turn]

    return lg_types.Command(update=GraphSession(current_turn=None, turns=turns))

## Compile Task Planner Agent

In [16]:
def load_graph():
    state_graph = graph.StateGraph(GraphSession)

    state_graph.add_node(PLANNER_NODE_NAME, planner_node)
    state_graph.add_node(EXECUTOR_NODE_NAME, executor_node)
    state_graph.add_node(REFLECTOR_NODE_NAME, reflector_node)
    state_graph.add_node(POST_PROCESS_NODE_NAME, post_process_node)

    state_graph.set_entry_point(PLANNER_NODE_NAME)

    return state_graph


state_graph = load_graph()

checkpointer = memory.MemorySaver()
compiled_graph = state_graph.compile(checkpointer=checkpointer)

### Visualize Agent Graph

In [17]:
png_bytes = compiled_graph.get_graph().draw_mermaid_png()

display(ipd.Image(png_bytes))

### Wrapper function to stream generation output to notebook

In [29]:
async def ask(user_input: str, session: str | None = None):
    thread_id = session or uuid.uuid4().hex

    agent_config = AgentConfig(
        project=PROJECT,
        region=REGION,
        planner_model_name=PLANNER_MODEL_NAME,
        executor_model_name=EXECUTOR_MODEL_NAME,
        reflector_model_name=REFLECTOR_MODEL_NAME,
    )

    current_source = last_source = None
    task_idx = 0
    all_text = ""
    async for stream_mode, chunk in compiled_graph.astream(
        input={"current_turn": {"user_input": user_input}},
        config={"configurable": {"thread_id": thread_id, "agent_config": agent_config}},
        stream_mode=["custom"],
    ):
        assert isinstance(chunk, dict), "Expected dictionary chunk"

        text = ""

        if "response" in chunk:
            # if no prior text, then no plan was generated
            if all_text.strip() == "":
                text = chunk["response"]
            else:
                text = "### Reflection\n\n" + chunk["response"]

            current_source = "response"

        elif "plan" in chunk:
            plan = Plan.model_validate(chunk["plan"])
            plan_string = stringify_plan(plan=plan, include_results=False)
            text = f"### Generated execution plan...\n\n{plan_string}"
            current_source = "plan"

        elif "executed_task" in chunk:
            task_idx += 1
            task = Task.model_validate(chunk["executed_task"])
            task_string = stringify_task(task=task, include_results=True)
            text = f"### Executed task #{task_idx}...\n\n{task_string}"
            current_source = f"executed_task_{task_idx}"

        else:
            print("unhandled chunk case:", chunk)

        if last_source is not None and last_source != current_source:
            text = "\n\n---\n\n" + text

        last_source = current_source

        all_text += text

        display(ipd.Markdown(all_text), clear=True)

## Test Conversation

In [30]:
session = uuid.uuid4().hex

In [31]:
await ask("hi", session=session)

In [32]:
await ask("can you recommend some video games for my child?", session=session)

In [33]:
await ask(
    "he is 15. he's into medieval history so he might like something related. He plays on his Playstation 5",
    session=session,
)

In [34]:
await ask(
    "I don't know anything about gaming. Can you give me more info?",
    session=session,
)