notebooks/live_processor_intro.ipynb (346 lines of code) (raw):
{
"cells": [
{
"metadata": {
"id": "DiEUDeWHA44f"
},
"cell_type": "code",
"source": [
"# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "59hCSLjTBmXC"
},
"cell_type": "markdown",
"source": [
"[](https://colab.research.google.com/github/google/genai-processors/blob/main/notebooks/live_processor_intro.ipynb)"
]
},
{
"metadata": {
"id": "4s8taUwfJYHP"
},
"cell_type": "markdown",
"source": [
"# Real-time Agents with Live Processors\n",
"\n",
"GenAI Processors have a `live_model.LiveProcessor` processor that wraps a\n",
"connection to the Gemini Live API. This makes it easy to create real-time agents\n",
"to connect to the Live API.\n",
"\n",
"This notebook contains ad-hoc code to collect and render audio output. To get a\n",
"plain and simple example of a live agent built on top of processor, check the\n",
"[live_simple_cli](https://github.com/google/genai-processors/blob/main/examples/live_simple_cli.py)\n",
"file that showcase how to connect to the Live API simply with a few processors\n",
"(handling both video and audio streams)."
]
},
{
"metadata": {
"id": "8G7p8Vi3JYHP"
},
"cell_type": "markdown",
"source": [
"## 1. 🛠️ Setup\n",
"\n",
"First, install the GenAI Processors library:"
]
},
{
"metadata": {
"id": "GasIyUf5JYHP"
},
"cell_type": "code",
"source": [
"!pip install genai-processors"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "FMu9r0YZJYHP"
},
"cell_type": "markdown",
"source": [
"### API Key\n",
"\n",
"To use the GenAI model processors, you will need an API key. If you have not\n",
"done so already, obtain your API key from Google AI Studio, and import it as a\n",
"secret in Colab (recommended) or directly set it below."
]
},
{
"metadata": {
"id": "fybEpDUNJYHP"
},
"cell_type": "code",
"source": [
"from google.colab import userdata\n",
"\n",
"API_KEY = userdata.get('GOOGLE_API_KEY')"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "0Pr6m_l0JYHP"
},
"cell_type": "markdown",
"source": [
"## 2. 📢 Create a Live API Processor with text input\n",
"\n",
"The following code snippet demonstrates how to create a real-time agent that\n",
"takes regular text entries from the user and outputs an audio response. Note\n",
"that the text input is collected on a separate task to avoid blocking the model\n",
"response.\n",
"\n",
"Enter **`q`** in the user input to stop the agent."
]
},
{
"metadata": {
"id": "hiaUnd6GJYHP"
},
"cell_type": "code",
"source": [
"import asyncio\n",
"from typing import AsyncIterable\n",
"from genai_processors import content_api\n",
"from genai_processors import processor\n",
"from genai_processors import streams\n",
"from genai_processors.core import live_model\n",
"from google.genai import types as genai_types\n",
"from IPython.display import Audio, display\n",
"import numpy as np\n",
"\n",
"LIVE_MODEL_NAME = \"gemini-2.0-flash-live-001\"\n",
"\n",
"live_processor = live_model.LiveProcessor(\n",
" api_key=API_KEY,\n",
" model_name=LIVE_MODEL_NAME,\n",
" realtime_config=genai_types.LiveConnectConfig(\n",
" realtime_input_config=genai_types.RealtimeInputConfig(\n",
" turn_coverage=( # Model sees all real-time input in a turn\n",
" \"TURN_INCLUDES_ALL_INPUT\"\n",
" )\n",
" ),\n",
" response_modalities=[\"AUDIO\"], # Request audio output\n",
" ),\n",
")\n",
"\n",
"\n",
"@processor.processor_function\n",
"async def collect_audio(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Yields a single Part containing all the audio from `content`.\"\"\"\n",
" audio_bytes = b\"\"\n",
" async for part in content:\n",
" if content_api.is_audio(part.mimetype):\n",
" audio_bytes += part.bytes\n",
" else:\n",
" yield part\n",
" if part.get_metadata(\"generation_complete\"):\n",
" # this is returned when the input stream is closed\n",
" yield content_api.ProcessorPart(\n",
" audio_bytes,\n",
" mimetype=\"audio/l16;rate=24000\",\n",
" metadata={\"turn_complete\": True},\n",
" )\n",
" audio_bytes = b\"\"\n",
"\n",
"\n",
"async def text_input(\n",
" live_queue: asyncio.Queue[content_api.ProcessorPartTypes | None],\n",
"):\n",
" \"\"\"Gets a single user input and adds it to the `live_queue`.\"\"\"\n",
" # We wrap the `input` function into an asyncio thread to avoid blocking the\n",
" # asyncio event loop as `input` is a blocking function.\n",
" await asyncio.sleep(0.5)\n",
" text_in = await asyncio.to_thread(input, \"User (type q to stop) \u003e \")\n",
" if text_in == \"q\":\n",
" await live_queue.put(None)\n",
" else:\n",
" await live_queue.put(\n",
" content_api.ProcessorPart(\n",
" text_in,\n",
" mimetype=\"text/plain\",\n",
" )\n",
" )\n",
"\n",
"\n",
"# Prepare the input stream\n",
"live_queue = asyncio.Queue()\n",
"greetings_stream = streams.stream_content([\"Hi there!\"])\n",
"# This is a common idiom to merge a queue inside the input stream. Any part\n",
"# added to `live_queue` will be inserted into the input_stream.\n",
"input_stream = streams.merge(\n",
" [greetings_stream, streams.dequeue(live_queue)],\n",
")\n",
"\n",
"# Prepare the live processor - the live processor will receive \"Hi there\" first.\n",
"# It will produce an audio output, after what we will schedule a task to take\n",
"# the user input. The collect_audio processor will collect all audio parts from\n",
"# the live processor and yield them as a single part that we can render in this\n",
"# notebook.\n",
"p = live_processor + collect_audio\n",
"user_input_task = None\n",
"async for part in p(input_stream):\n",
" if content_api.is_audio(part.mimetype):\n",
" # Render the Audio bytes into a playable UI element.\n",
" display(\n",
" Audio(\n",
" data=np.frombuffer(part.bytes, dtype=np.int16),\n",
" rate=24000,\n",
" autoplay=True,\n",
" )\n",
" )\n",
" if user_input_task is not None:\n",
" # Cancel the existing user input, we create a new one each time we receive\n",
" # a response to the model.\n",
" user_input_task.cancel()\n",
" # We create a task here to avoid being blocked by the user input. The model\n",
" # can output other parts in the meantime.\n",
" user_input_task = processor.create_task(text_input(live_queue))\n",
" else:\n",
" print(part)\n",
"if user_input_task is not None:\n",
" user_input_task.cancel()\n",
" await user_input_task"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "1U9WmKFDJYHP"
},
"cell_type": "markdown",
"source": [
"## 3. ⏰ Ping the model every 5 seconds\n",
"\n",
"To highlight the async nature of real-time agents, we are going to add a task\n",
"that pings the Live API every 5 seconds to trigger a model response. It stops\n",
"after 5 iterations."
]
},
{
"metadata": {
"id": "Dvq3PkPCJYHP"
},
"cell_type": "code",
"source": [
"async def ping(\n",
" live_queue: asyncio.Queue[content_api.ProcessorPartTypes | None],\n",
"):\n",
" await asyncio.sleep(5)\n",
" await live_queue.put(\"tell me something new\")\n",
"\n",
"\n",
"# Prepare the input stream\n",
"live_queue = asyncio.Queue()\n",
"ping_stream = streams.stream_content([\"Hi there!\"])\n",
"# This is a common idiom to merge a queue inside the input stream. Any part\n",
"# added to `live_queue` will be inserted into the input_stream.\n",
"input_stream = streams.merge(\n",
" [ping_stream, streams.dequeue(live_queue)],\n",
")\n",
"\n",
"ping_task = None\n",
"ping_count = 0\n",
"\n",
"async for part in p(input_stream):\n",
" if content_api.is_audio(part.mimetype):\n",
" # We received a response from the model, either from the user or from a\n",
" # ping. Reschedule a new ping, in case the user does not enter any input.\n",
" if ping_task is not None:\n",
" ping_task.cancel()\n",
"\n",
" # Render the Audio bytes into a playable UI element.\n",
" display(\n",
" Audio(\n",
" data=np.frombuffer(part.bytes, dtype=np.int16),\n",
" rate=24000,\n",
" autoplay=True,\n",
" )\n",
" )\n",
" ping_task = processor.create_task(ping(live_queue))\n",
"\n",
" if ping_count \u003c 4:\n",
" ping_count += 1\n",
" ping_task = processor.create_task(ping(live_queue))\n",
" else:\n",
" await live_queue.put(None)\n",
" else:\n",
" print(part)\n",
"\n",
"if ping_task is not None:\n",
" ping_task.cancel()\n",
" await ping_task"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "sanIYSsdJYHQ"
},
"cell_type": "markdown",
"source": [
"## 4. ➡️ Next Steps\n",
"\n",
"To explore more advanced features and real-world examples, check out the\n",
"following:\n",
"\n",
"* **[`examples/`](https://github.com/google-gemini/genai-processors/blob/main/examples/)**:\n",
" List of simple examples showcasing how to use different processors.\n",
"* **[`examples/research/`](https://github.com/google-gemini/genai-processors/blob/main/examples/research/README.md)**:\n",
" Demonstrates building a research agent that generates, researches, and\n",
" synthesizes information.\n",
"* **[`examples/live/`](https://github.com/google-gemini/genai-processors/blob/main/examples/live/README.md)**:\n",
" Shows how to build a live commentary agent that processes real-time audio\n",
" and video.\n",
"* **[`core/`](https://github.com/google-gemini/genai-processors/blob/main/core):**\n",
" Explore the built-in processors for tasks like adding timestamps,\n",
" rate-limiting audio, and event detection."
]
}
],
"metadata": {
"colab": {
"last_runtime": {
"build_target": "",
"kind": "local"
},
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}