In [None]:
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google/genai-processors/blob/main/notebooks/live_processor_intro.ipynb)

# Real-time Agents with Live Processors

GenAI Processors have a `live_model.LiveProcessor` processor that wraps a
connection to the Gemini Live API. This makes it easy to create real-time agents
to connect to the Live API.

This notebook contains ad-hoc code to collect and render audio output. To get a
plain and simple example of a live agent built on top of processor, check the
[live_simple_cli](https://github.com/google/genai-processors/blob/main/examples/live_simple_cli.py)
file that showcase how to connect to the Live API simply with a few processors
(handling both video and audio streams).

## 1. üõ†Ô∏è Setup

First, install the GenAI Processors library:

In [None]:
!pip install genai-processors

### API Key

To use the GenAI model processors, you will need an API key. If you have not
done so already, obtain your API key from Google AI Studio, and import it as a
secret in Colab (recommended) or directly set it below.

In [None]:
from google.colab import userdata

API_KEY = userdata.get('GOOGLE_API_KEY')

## 2. üì¢ Create a Live API Processor with text input

The following code snippet demonstrates how to create a real-time agent that
takes regular text entries from the user and outputs an audio response. Note
that the text input is collected on a separate task to avoid blocking the model
response.

Enter **`q`** in the user input to stop the agent.

In [None]:
import asyncio
from typing import AsyncIterable
from genai_processors import content_api
from genai_processors import processor
from genai_processors import streams
from genai_processors.core import live_model
from google.genai import types as genai_types
from IPython.display import Audio, display
import numpy as np

LIVE_MODEL_NAME = "gemini-2.0-flash-live-001"

live_processor = live_model.LiveProcessor(
    api_key=API_KEY,
    model_name=LIVE_MODEL_NAME,
    realtime_config=genai_types.LiveConnectConfig(
        realtime_input_config=genai_types.RealtimeInputConfig(
            turn_coverage=(  # Model sees all real-time input in a turn
                "TURN_INCLUDES_ALL_INPUT"
            )
        ),
        response_modalities=["AUDIO"],  # Request audio output
    ),
)


@processor.processor_function
async def collect_audio(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Yields a single Part containing all the audio from `content`."""
  audio_bytes = b""
  async for part in content:
    if content_api.is_audio(part.mimetype):
      audio_bytes += part.bytes
    else:
      yield part
    if part.get_metadata("generation_complete"):
      # this is returned when the input stream is closed
      yield content_api.ProcessorPart(
          audio_bytes,
          mimetype="audio/l16;rate=24000",
          metadata={"turn_complete": True},
      )
      audio_bytes = b""


async def text_input(
    live_queue: asyncio.Queue[content_api.ProcessorPartTypes | None],
):
  """Gets a single user input and adds it to the `live_queue`."""
  # We wrap the `input` function into an asyncio thread to avoid blocking the
  # asyncio event loop as `input` is a blocking function.
  await asyncio.sleep(0.5)
  text_in = await asyncio.to_thread(input, "User (type q to stop) > ")
  if text_in == "q":
    await live_queue.put(None)
  else:
    await live_queue.put(
        content_api.ProcessorPart(
            text_in,
            mimetype="text/plain",
        )
    )


# Prepare the input stream
live_queue = asyncio.Queue()
greetings_stream = streams.stream_content(["Hi there!"])
# This is a common idiom to merge a queue inside the input stream. Any part
# added to `live_queue` will be inserted into the input_stream.
input_stream = streams.merge(
    [greetings_stream, streams.dequeue(live_queue)],
)

# Prepare the live processor - the live processor will receive "Hi there" first.
# It will produce an audio output, after what we will schedule a task to take
# the user input. The collect_audio processor will collect all audio parts from
# the live processor and yield them as a single part that we can render in this
# notebook.
p = live_processor + collect_audio
user_input_task = None
async for part in p(input_stream):
  if content_api.is_audio(part.mimetype):
    # Render the Audio bytes into a playable UI element.
    display(
        Audio(
            data=np.frombuffer(part.bytes, dtype=np.int16),
            rate=24000,
            autoplay=True,
        )
    )
    if user_input_task is not None:
      # Cancel the existing user input, we create a new one each time we receive
      # a response to the model.
      user_input_task.cancel()
    # We create a task here to avoid being blocked by the user input. The model
    # can output other parts in the meantime.
    user_input_task = processor.create_task(text_input(live_queue))
  else:
    print(part)
if user_input_task is not None:
  user_input_task.cancel()
  await user_input_task

## 3. ‚è∞ Ping the model every 5 seconds

To highlight the async nature of real-time agents, we are going to add a task
that pings the Live API every 5 seconds to trigger a model response. It stops
after 5 iterations.

In [None]:
async def ping(
    live_queue: asyncio.Queue[content_api.ProcessorPartTypes | None],
):
  await asyncio.sleep(5)
  await live_queue.put("tell me something new")


# Prepare the input stream
live_queue = asyncio.Queue()
ping_stream = streams.stream_content(["Hi there!"])
# This is a common idiom to merge a queue inside the input stream. Any part
# added to `live_queue` will be inserted into the input_stream.
input_stream = streams.merge(
    [ping_stream, streams.dequeue(live_queue)],
)

ping_task = None
ping_count = 0

async for part in p(input_stream):
  if content_api.is_audio(part.mimetype):
    # We received a response from the model, either from the user or from a
    # ping. Reschedule a new ping, in case the user does not enter any input.
    if ping_task is not None:
      ping_task.cancel()

    # Render the Audio bytes into a playable UI element.
    display(
        Audio(
            data=np.frombuffer(part.bytes, dtype=np.int16),
            rate=24000,
            autoplay=True,
        )
    )
    ping_task = processor.create_task(ping(live_queue))

    if ping_count < 4:
      ping_count += 1
      ping_task = processor.create_task(ping(live_queue))
    else:
      await live_queue.put(None)
  else:
    print(part)

if ping_task is not None:
  ping_task.cancel()
  await ping_task

## 4. ‚û°Ô∏è Next Steps

To explore more advanced features and real-world examples, check out the
following:

*   **[`examples/`](https://github.com/google-gemini/genai-processors/blob/main/examples/)**:
    List of simple examples showcasing how to use different processors.
*   **[`examples/research/`](https://github.com/google-gemini/genai-processors/blob/main/examples/research/README.md)**:
    Demonstrates building a research agent that generates, researches, and
    synthesizes information.
*   **[`examples/live/`](https://github.com/google-gemini/genai-processors/blob/main/examples/live/README.md)**:
    Shows how to build a live commentary agent that processes real-time audio
    and video.
*   **[`core/`](https://github.com/google-gemini/genai-processors/blob/main/core):**
    Explore the built-in processors for tasks like adding timestamps,
    rate-limiting audio, and event detection.