notebooks/processor_intro.ipynb (1,295 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "pmxT84JrBnal"
},
"outputs": [],
"source": [
"# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "J4Na02FxJeQs"
},
"source": [
"[](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/processor_intro.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "bfgIREiTJeQs"
},
"source": [
"# Getting Started\n",
"\n",
"Step-by-step tutorial on how to get started with the Genai Processors library."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fb4sNyKXJeQs"
},
"source": [
"## 1. 🛠️ Setup\n",
"\n",
"First, install the GenAI Processors library:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "HeYMv5yzJeQt"
},
"outputs": [],
"source": [
"!pip install genai-processors"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "A8pCRhTdJeQt"
},
"source": [
"### API Key\n",
"\n",
"To use the GenAI model processors, you will need an API key. If you have not\n",
"done so already, obtain your API key from Google AI Studio, and import it as a\n",
"secret in Colab (recommended) or directly set it below. You can still run this\n",
"tutorial if you do not have an API key, but will need to skip the `Using GenAI\n",
"Models as Processors` section."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "035JT-2fJeQt"
},
"outputs": [],
"source": [
"from google.colab import userdata\n",
"\n",
"API_KEY = userdata.get('GOOGLE_API_KEY')"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JcFLGGVWJeQt"
},
"source": [
"## 2. 💡 Understanding Core Concepts\n",
"\n",
"The Genai Processors library revolves around three main concepts:\n",
"\n",
"* **`ProcessorPart`:** The standard data object used by Processors. It\n",
" represents a single piece of content **of a given modality**, such as text,\n",
" an image, or structured data. Each `ProcessorPart` can have metadata\n",
" attached, such as `mimetype` or `substream_name`, to categorize and route\n",
" the content.\n",
"* **`Processor`:** A processing unit that takes an asynchronous stream\n",
" (AsyncIterable) of `ProcessorPart` objects as input and returns an\n",
" asynchronous stream of `ProcessorPart` objects as output. Processors can be\n",
" chained together to form complex pipelines.\n",
"* **`PartProcessor`:** A specialized Processor for the case when parts in the\n",
" stream can be processed independently. A PartProcessor takes a single\n",
" `ProcessorPart` and returns an asynchronous stream of `ProcessorPart`. The\n",
" library takes care of invoking the PartProcessors concurrently for each\n",
" `ProcessorPart` in the incoming stream and assembling the output in the\n",
" correct order. This allows for efficient concurrent processing of\n",
" independent parts, avoiding delays when multiple PartProcessors are used in\n",
" sequence.\n",
"\n",
"**NOTE**: It's easy to confuse `ProcessorPart` and `PartProcessor` as they have\n",
"similar names but refer to different concepts:\n",
"\n",
"\u003e * `ProcessorPart` is a **data object** representing a single piece of\n",
"\u003e content.\n",
"\u003e * `PartProcessor` is a **Processor** designed to operate on individual\n",
"\u003e `ProcessorPart`s."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IZerwuv8JeQt"
},
"source": [
"## 3. 🔨 Creating a Simple Processor\n",
"\n",
"Let's create a simple processor that replaces any `.` character with an `EoS`\n",
"tag. This is done below using a `@processor.processor_function` decorator that\n",
"converts an asynchronous generator function into a `Processor` object. This is a\n",
"convenience method for creating a Processor from a single function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "EpsZRj-JJeQt"
},
"outputs": [],
"source": [
"from collections.abc import AsyncIterable\n",
"from genai_processors import content_api, processor\n",
"\n",
"\n",
"@processor.processor_function\n",
"async def simple_text_processor(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Replaces dots with '[EoS]'.\"\"\"\n",
" async for part in content:\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text.replace(\".\", \"[EoS]\"))\n",
" else:\n",
" yield part"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "om6gFwr9JeQt"
},
"source": [
"Processors can also be defined by inheriting from the `processor.Processor`\n",
"class and implementing the `call(..)` method. This is the recommended method if\n",
"the Processor requires a persistent state or is parameterized."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1qfDDd_3JeQt"
},
"outputs": [],
"source": [
"from genai_processors.core import preamble\n",
"\n",
"\n",
"class SimpleTextProcessor(processor.Processor):\n",
"\n",
" def __init__(self, eos_string: str):\n",
" self._eos = eos_string\n",
" # Preamble adds a prefix to a content stream.\n",
" self._preamble = preamble.Preamble(\"Starting. \")\n",
"\n",
" async def call(\n",
" self,\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
" ) -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Replaces dots with '[EoS]'.\"\"\"\n",
" async for part in self._preamble(content):\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text.replace(\".\", self._eos))\n",
" else:\n",
" yield part"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "b5ZtYHJVJeQt"
},
"source": [
"## 4. ▶️ Applying a Processor\n",
"\n",
"You can apply a Processor to an input stream by iterating over the Processor\n",
"directly with `async for` for asynchronous execution. This is the recommended\n",
"way.\n",
"\n",
"In this notebook, we often use the utility `streams.stream_content` to create an\n",
"input stream from a list of `ProcessorPartTypes`, which includes\n",
"`ProcessorParts` but also generic types like `str` or `bytes`. We will see later\n",
"in the *Working with Streams and AsyncIterables* section how to create input\n",
"streams in more advanced setups.\n",
"\n",
"### Asynchronous Application [recommended]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Dr4SCwDwJeQt"
},
"outputs": [],
"source": [
"import asyncio\n",
"from genai_processors import streams\n",
"\n",
"input_parts = [\"Hello\", \"World\"]\n",
"input_stream = streams.stream_content(input_parts)\n",
"\n",
"print(\"\\nAsynchronous Output:\")\n",
"async for part in simple_text_processor(input_stream):\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "inX_-jRBJeQt"
},
"source": [
"Note that `simple_text_processor` is a Processor instance. If we define it as a\n",
"class, it needs to be instantiated before use, so this code:\n",
"\n",
"```python\n",
"async for part in simple_text_processor(stream):\n",
" ...\n",
"```\n",
"\n",
"would have to be replaced by:\n",
"\n",
"```python\n",
"p = SimpleTextProcessor(\"[EoS]\")\n",
"async for part in p(stream):\n",
" ...\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qrpd86cpJeQt"
},
"source": [
"### Synchronous Application\n",
"\n",
"For synchronous execution, you can apply a processor to a list of\n",
"`ProcessorPart` objects using `processor.apply_sync`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "oZif0o_pJeQt"
},
"outputs": [],
"source": [
"import nest_asyncio\n",
"\n",
"nest_asyncio.apply() # Needed to run async loops in Colab\n",
"\n",
"processed_parts_sync = processor.apply_sync(simple_text_processor, input_parts)\n",
"\n",
"print(\"Synchronous Output:\")\n",
"for part in processed_parts_sync:\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hVeh4DNDJeQt"
},
"source": [
"## 5. ⛓️ Chaining Processors\n",
"\n",
"The real power of the library comes from chaining processors together using the\n",
"`+` operator."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "jrmtTDBCJeQt"
},
"outputs": [],
"source": [
"@processor.processor_function\n",
"async def another_text_processor(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Lowercases everything.\"\"\"\n",
" async for part in content:\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text.lower())\n",
" else:\n",
" yield part\n",
"\n",
"\n",
"chained_processor = simple_text_processor + another_text_processor\n",
"input_streams = streams.stream_content([\"First. Second.\"])\n",
"\n",
"print(\"\\nChained Processor Output:\")\n",
"async for part in chained_processor(input_streams):\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2WP9wwGgJeQt"
},
"source": [
"This will output: `first[eos] second[eos]`\n",
"\n",
"The `+` operator takes care of combining `Processors` and `PartProcessors`\n",
"correctly. However, grouping all `PartProcessors` together as much as possible\n",
"will maximize efficiency.\n",
"\n",
"Chaining behaves differently for `ProcessorPart`s in the special `debug` and\n",
"`status` substreams. Parts in those substreams are returned to the caller as\n",
"soon as they are generated, and will not be passed to the next Processor in the\n",
"chain."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "a8jNwCUnJeQt"
},
"outputs": [],
"source": [
"@processor.processor_function\n",
"async def simple_text_processor_with_status(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Replaces dots with '[EoS]'.\"\"\"\n",
" async for part in content:\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text.replace(\".\", \"[EoS]\"))\n",
" yield processor.status(f\"Simple processor done on {part.text}\")\n",
" else:\n",
" yield part\n",
"\n",
"\n",
"chained_processor = simple_text_processor_with_status + another_text_processor\n",
"input_streams = streams.stream_content([\"First.\", \"Second.\"])\n",
"\n",
"print(\"\\nChained Processor Output:\")\n",
"async for part in chained_processor(input_streams):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "e9nvu-pkdrYG"
},
"source": [
"Note that the `ProcessorPart`s in the `status` substream are not processed by\n",
"`another_text_processor` that lowercases everything.\n",
"\n",
"## 6. 🛣️ Parallel and Switch for Processors\n",
"\n",
"Processors can be run in parallel using the `parallel_concat()` function that\n",
"takes a sequence of processors as argument:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "KuqiRfOLexDr"
},
"outputs": [],
"source": [
"input_stream = streams.stream_content([\"First.\", \"Second.\"])\n",
"\n",
"p = [another_text_processor, simple_text_processor_with_status]\n",
"\n",
"p = processor.parallel_concat(p)\n",
"\n",
"print(\"\\nParallel Processor Output:\")\n",
"async for part in p(input_stream):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wlzjX8DofE_s"
},
"source": [
"The output (without status Parts) is `first., second., First[EoS], Second[EoS]`,\n",
"which is the output of `another_text_processor` followed by the output of\n",
"`simple_text_processor_with_status`: it follows the order of the processor list\n",
"passed as argument to `parallel_concat`.\n",
"\n",
"**WARNING**: Unhandled parts that are passed through by more than one processor\n",
"are repeated in the output.\n",
"\n",
"If you need to output interleaved (not concatenated) results, consider using the\n",
"`PartProcessor` variant described in the \"Parallel Execution of PartProcessor\"\n",
"section.\n",
"\n",
"Such parallel operation is not exclusive: one part is processed across several\n",
"processors at the same time and in parallel. When the processors need to process\n",
"different parts of the input stream in an exclusive way, we can rely on a switch\n",
"operatory."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xK6Fx75gkABv"
},
"outputs": [],
"source": [
"from genai_processors import switch\n",
"\n",
"input_stream = streams.stream_content([\n",
" content_api.ProcessorPart(\"a1\", substream_name=\"a\"),\n",
" content_api.ProcessorPart(\"b1\", substream_name=\"b\"),\n",
" content_api.ProcessorPart(\"a2\", substream_name=\"a\"),\n",
" content_api.ProcessorPart(\"b2\", substream_name=\"b\"),\n",
" content_api.ProcessorPart(\"b3\", substream_name=\"b\"),\n",
"])\n",
"\n",
"m = (\n",
" switch.Switch(content_api.get_substream_name)\n",
" .case(\"a\", another_text_processor)\n",
" .case(\"b\", simple_text_processor)\n",
" .default(processor.passthrough())\n",
")\n",
"\n",
"print(\"\\nSwitch Processor Output:\")\n",
"async for part in m(input_stream):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ibKai8zCJeQt"
},
"source": [
"`match` is a switch processor that does not run in parallel but routes the input\n",
"parts based on the conditions defined in the case statements. The first case\n",
"condition that is matched defines which processor is used on the part. If no\n",
"case is matched, the default option is run. In this example, we pass the part as\n",
"is. Leaving default out means that no part is returned when no case is matched.\n",
"\n",
"**TIP**: switch processors can be used to define branching conditions based on\n",
"parts in processor flows.\n",
"\n",
"## 7. 🤖 Using GenAI Models as Processors\n",
"\n",
"The library provides built-in processors for interacting with Google's\n",
"Generative AI models. If you have not provided an API Key above, you can skip\n",
"this section."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "wPLAsKxsJeQt"
},
"outputs": [],
"source": [
"from genai_processors.core import genai_model\n",
"from google.genai import types as genai_types\n",
"\n",
"# Initialize the GenAI model processor\n",
"# Replace 'gemini-2.0-flash' with your desired model name\n",
"genai_processor = genai_model.GenaiModel(\n",
" api_key=API_KEY,\n",
" model_name=\"gemini-2.0-flash\",\n",
" generate_content_config=genai_types.GenerateContentConfig(temperature=0.7),\n",
")\n",
"\n",
"# Chain the GenAI processor with a processor to lowercase all inputs.\n",
"genai_pipeline = another_text_processor + genai_processor\n",
"\n",
"input_prompt_genai = [\n",
" \"Explain the Concept of LARGE LANGUAGE MODELS\",\n",
" \"in two sentences\",\n",
"]\n",
"input_stream_genai = streams.stream_content(input_prompt_genai)\n",
"\n",
"print(\"\\nGenAI Pipeline Output:\")\n",
"async for part in genai_pipeline(input_stream_genai):\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ozEE4bcdJeQt"
},
"source": [
"The GenAI Processor library provides a `TTFTSingleStream` Processor to record\n",
"the Time-To-First-Token (TTFT) on any unidirectional streaming Processor. It\n",
"wraps the input Processor, keeping its original logic, and records the time from\n",
"invocation to the first output. This TTFT processor can only be used with GenAI\n",
"models that are not bidirectional (it cannot be applied to a LiveProcessor)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "fg3iMXZVJeQt"
},
"outputs": [],
"source": [
"from genai_processors import debug\n",
"\n",
"# Chain the GenAI processor with a processor to lowercase all inputs.\n",
"genai_pipeline = (\n",
" another_text_processor\n",
" # Add a tag \"GenAI Model\" to which processor the TTFT applies to\n",
" + debug.TTFTSingleStream(\"GenAI Model\", genai_processor)\n",
")\n",
"\n",
"input_prompt_genai = [\n",
" \"Explain the Concept of LARGE LANGUAGE MODELS\",\n",
" \"in two sentences\",\n",
"]\n",
"input_stream_genai = streams.stream_content(input_prompt_genai)\n",
"\n",
"print(\"\\nGenAI Pipeline Output:\")\n",
"async for part in genai_pipeline(input_stream_genai):\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "R8aIhP13JeQt"
},
"source": [
"The `GenAI Model TTFT=x.xx seconds` appears before the processor output and is\n",
"returned in the `status` substream.\n",
"\n",
"Processors can also be used to easily connect to the Live API, as demonstrated\n",
"in the following example:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "7gcUDpJBJeQt"
},
"outputs": [],
"source": [
"from genai_processors.core import live_model\n",
"from google.genai import types as genai_types\n",
"from IPython.display import Audio, display\n",
"import numpy as np\n",
"\n",
"LIVE_MODEL_NAME = \"gemini-2.0-flash-live-001\"\n",
"\n",
"live_processor = live_model.LiveProcessor(\n",
" api_key=API_KEY,\n",
" model_name=LIVE_MODEL_NAME,\n",
" realtime_config=genai_types.LiveConnectConfig(\n",
" # Basic configuration for real-time text and audio interaction\n",
" output_audio_transcription={}, # Enable transcription of audio output\n",
" realtime_input_config=genai_types.RealtimeInputConfig(\n",
" turn_coverage=( # Model sees all real-time input in a turn\n",
" \"TURN_INCLUDES_ALL_INPUT\"\n",
" )\n",
" ),\n",
" response_modalities=[\"AUDIO\"], # Request audio output\n",
" ),\n",
")\n",
"\n",
"\n",
"@processor.processor_function\n",
"async def collect_audio(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Yields a single Part containing all the audio from `content`.\"\"\"\n",
" audio_bytes = b\"\"\n",
" async for part in content:\n",
" if content_api.is_audio(part.mimetype):\n",
" audio_bytes += part.bytes\n",
" elif content_api.is_text(part.mimetype):\n",
" print(part)\n",
" # This is yielded when the input stream is closed.\n",
" yield content_api.ProcessorPart(\n",
" audio_bytes,\n",
" mimetype=\"audio/l16;rate=24000\",\n",
" )\n",
"\n",
"\n",
"# We only add text here, but this can contain audio, images, etc. This would\n",
"# typically come from a camera, microphone, or other input source.\n",
"input_stream = streams.stream_content(\n",
" [\n",
" content_api.ProcessorPart(\n",
" \"How are you today?\", substream_name=\"realtime\"\n",
" )\n",
" ],\n",
" # This is needed for this example only: we wait here to give enough time\n",
" # for the model to generate audio before we close the stream.\n",
" with_delay_sec=7,\n",
")\n",
"print(\"\\nLive Processor Output:\")\n",
"p = live_processor + collect_audio\n",
"async for part in p(input_stream):\n",
" audio_track = Audio(\n",
" data=np.frombuffer(part.bytes, dtype=np.int16),\n",
" rate=24000,\n",
" autoplay=True,\n",
" )\n",
" display(audio_track)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "B867vft-JeQt"
},
"source": [
"## 8. 🧩 Working with PartProcessors\n",
"\n",
"For operations that apply to individual `ProcessorPart` objects independently,\n",
"you can use the `PartProcessor` class, or the\n",
"`@processor.part_processor_function` decorator. A `PartProcessor` can be cast\n",
"into a `Processor` with the `to_processor()` method. The resulting Processor\n",
"runs on all Parts in the input stream concurrently while preserving the order of\n",
"the parts. When chained with another `Processor`, a `PartProcessor` is\n",
"implicitly cast into a `Processor`. The use of `to_processor()` is therefore not\n",
"always needed. But if you want to apply a `PartProcessor` to an\n",
"`AsyncIterable[content_api.ProcessorPart]`, you need to run this method.\n",
"\n",
"When you define a `PartProcessor` you can also add a `match` function that\n",
"defines what `Part` type this processor handles. While optional, it is\n",
"recommended to specify them. It is used to optimize how the GenAI Processor\n",
"library schedules asyncio tasks.\n",
"\n",
"A match function has the following signature:\n",
"\n",
"```python\n",
"def match(part: content_api.ProcessorPart) -\u003e bool:\n",
" \"\"\"Returns False if `part` is irrelevant for the processor, True otherwise.\"\"\"\n",
" ...\n",
"```\n",
"\n",
"The default implementation returns `True`, i.e. it assumes all parts should be\n",
"considered by the processor. It is OK to return `True` even if the part is not\n",
"relevant, the part will be processed and ignored. On the other hand, it is\n",
"important to be correct when returning `False`: any part where `match` returns\n",
"`False` will not be processed at all.\n",
"\n",
"The `match` default implementation can be overridden in the `PartProcessor`\n",
"class or an ad-hoc `match` function can be provided as an extra parameter in the\n",
"`@processor.part_processor_function` decorator as shown below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2Lng1QldJeQt"
},
"outputs": [],
"source": [
"def match_text(part: content_api.ProcessorPart) -\u003e bool:\n",
" return content_api.is_text(part.mimetype)\n",
"\n",
"\n",
"@processor.part_processor_function(match_fn=match_text)\n",
"async def duplicate_part(\n",
" part: content_api.ProcessorPart,\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Duplicates the input part.\"\"\"\n",
" yield part\n",
" yield part\n",
"\n",
"\n",
"input_parts_duplicate = streams.stream_content([\"A\", \"B\"])\n",
"\n",
"# To apply `duplicate_part` on the input *stream*, we need a Processor.\n",
"p = duplicate_part.to_processor()\n",
"\n",
"print(\"\\nPart Processor Output:\")\n",
"async for part in p(input_parts_duplicate):\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NwL9nRgUJeQt"
},
"source": [
"This will output: `A`, `A`, `B`, `B`.\n",
"\n",
"The library offers a convenient way to create filters as `PartProcessors` using\n",
"the `create_filter` method that takes a `Callable[[ProcessorPart], bool]` as\n",
"input:\n",
"\n",
"```python\n",
"# Creates a PartProcessor that only outputs the text parts. All other parts\n",
"# are dropped.\n",
"p = processor.create_filter(content_api.is_text)\n",
"```\n",
"\n",
"### A Note on `to_processor()`\n",
"\n",
"PartProcessors do not implement the Processor interface and can sometimes be\n",
"confused with Processors, which can lead to the following wrong code:\n",
"\n",
"```python\n",
"# p is a PartProcessor defined somewhere else in the code.\n",
"p = part_processor\n",
"\n",
"async def my_processor(\n",
" content: AsyncIterable[ProcessorPart]\n",
") -\u003e AsyncIterable[ProcessorPart]:\n",
" # This is an error as `p` expects a ProcessorPart and not an AsyncIterable.\n",
" async for part in p(content):\n",
" ...\n",
"```\n",
"\n",
"This can be easily fixed by casting the PartProcessor into a Processor using the\n",
"`to_processor()` method.\n",
"\n",
"```python\n",
"# p is a now a processor.\n",
"p = part_processor.to_processor()\n",
"\n",
"async def my_processor(\n",
" content: AsyncIterable[ProcessorPart]\n",
") -\u003e AsyncIterable[ProcessorPart]:\n",
" # This is ok, `p` accepts AsyncIterables as input.\n",
" async for part in p(content):\n",
" ...\n",
"```\n",
"\n",
"The `to_processor` method can be applied to Processors and PartProcessors alike.\n",
"In doubt, do not hesitate to apply it to ensure you can use your Processor with\n",
"AsyncIterables.\n",
"\n",
"## 9. 🏎️ Parallel Execution of PartProcessors\n",
"\n",
"You can run multiple PartProcessor instances in parallel using the `//`\n",
"operator."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "WnqzIweBJeQt"
},
"outputs": [],
"source": [
"@processor.part_processor_function\n",
"async def append_star(\n",
" part: content_api.ProcessorPart,\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Appends a star to the text.\"\"\"\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text + \"*\")\n",
"\n",
"\n",
"@processor.part_processor_function\n",
"async def append_hash(\n",
" part: content_api.ProcessorPart,\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" \"\"\"Appends a hash to the text.\"\"\"\n",
" if content_api.is_text(part.mimetype):\n",
" yield content_api.ProcessorPart(part.text + \"#\")\n",
"\n",
"\n",
"parallel_processors = append_star // append_hash // processor.PASSTHROUGH_ALWAYS\n",
"\n",
"input_parts_parallel = streams.stream_content([\n",
" \"Item_1\",\n",
" \"Item_2\",\n",
" content_api.ProcessorPart(b\"\", mimetype=\"audio/l16;rate=24000\"),\n",
"])\n",
"\n",
"print(\"\\nParallel Part Processors Output:\")\n",
"async for part in parallel_processors.to_processor()(input_parts_parallel):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vEK0kiiHJeQt"
},
"source": [
"This will output: `Item_1*`, `Item_1#`, `Item_2*`, `Item_2#`, `\u003caudio part\u003e`.\n",
"\n",
"The `//` operator is applied on `PartProcessors` only. All PartProcessors will\n",
"then run concurrently on the input parts, and their output sequences will be\n",
"concatenated in the order provided to the `//` expression. In this example,\n",
"adding a star is done before adding a hash, as `append_star` is before\n",
"`append_hash` in the expression. The input order is also respected, with\n",
"`Item_1` appearing before `Item_2` in the sequence.\n",
"\n",
"For efficiency, input parts are not copied before passing them to the multiple\n",
"PartProcessors in a `//` expression; rather, the same object is passed. This\n",
"means the PartProcessors *should not* change any of the mutable attributes of\n",
"their input Part argument.\n",
"\n",
"When no output is returned by any of the individual PartProcessors, by default\n",
"nothing is returned from the full expression. A special mode can be triggered,\n",
"which ensures the input part is returned as-is if no Processor in the `//` group\n",
"returns anything:\n",
"\n",
"```python\n",
"parallel_processors = (\n",
" append_star // append_hash // processor.PASSTHROUGH_FALLBACK\n",
")\n",
"```\n",
"\n",
"Such `//` operator can be handy when chunk processors pre-process the input\n",
"based on the input type. A typical usage pattern is as follows where\n",
"`xx_processor` defines a pre-processing step on a given part type:\n",
"\n",
"```python\n",
"p1 = processor.create_filter(content_api.is_image) + image_processor\n",
"p2 = processor.create_filter(content_api.is_audio) + audio_processor\n",
"total_processor = p1 // p2 // processor.PASSTHROUGH_FALLBACK\n",
"```\n",
"\n",
"Similar to Processors, PartProcessors also have a switch statement. It operates\n",
"with PartProcessors and ensures that the parts in the input and output streams\n",
"are ordered the same way. They are processed concurrently though meaning that\n",
"the order should not matter for their execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "yb8a0kFno1ae"
},
"outputs": [],
"source": [
"from genai_processors import switch\n",
"\n",
"input_stream = streams.stream_content([\n",
" content_api.ProcessorPart(\"a1\", substream_name=\"a\"),\n",
" content_api.ProcessorPart(\"b1\", substream_name=\"b\"),\n",
" content_api.ProcessorPart(\"a2\", substream_name=\"a\"),\n",
" content_api.ProcessorPart(\"b2\", substream_name=\"b\"),\n",
" content_api.ProcessorPart(\"b3\", substream_name=\"b\"),\n",
"])\n",
"\n",
"m = (\n",
" switch.PartSwitch(content_api.get_substream_name)\n",
" .case(\"a\", append_star)\n",
" .case(\"b\", append_hash)\n",
" .default(processor.passthrough())\n",
")\n",
"\n",
"print(\"\\nPartSwitch Output:\")\n",
"p = m.to_processor()\n",
"async for part in p(input_stream):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZoHiJwZNoU85"
},
"source": [
"Note that the order of the output stream is consistent with the order of the\n",
"input stream.\n",
"\n",
"The case conditions in both `Switch` and `PartSwitch` are the same. The main\n",
"difference between the two switches is their types (Processor or PartProcessor)\n",
"and the fact that the order is kept for one (PartProcessor) but not for the\n",
"other. The Switch Processor only preserves order for the chunks going through\n",
"the same case condition and the same Processor.\n",
"\n",
"## 10. 🧱 Handling Different Content Types\n",
"\n",
"The `content_api` module provides utilities for working with various content\n",
"types within `ProcessorPart` objects, such as accessing text, images, or custom\n",
"structured data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ywDknChWJeQt"
},
"outputs": [],
"source": [
"import io\n",
"from PIL import Image\n",
"\n",
"# Create a simple black image\n",
"img = Image.new(\"RGB\", (60, 30), color=\"black\")\n",
"img_byte_arr = io.BytesIO()\n",
"img.save(img_byte_arr, format=\"PNG\")\n",
"img_bytes = img_byte_arr.getvalue()\n",
"\n",
"image_part = content_api.ProcessorPart(img_bytes, mimetype=\"image/png\")\n",
"text_part = content_api.ProcessorPart(\"Some text\")\n",
"\n",
"# Accessing content\n",
"print(\"\\nContent API Examples:\")\n",
"print(f\"Text part text: {text_part.text}\")\n",
"print(f\"Image part mimetype: {image_part.mimetype}\")\n",
"\n",
"# Using content_api.as_text to extract text from a list of parts\n",
"all_parts = [text_part, image_part, content_api.ProcessorPart(\" more text\")]\n",
"print(f\"Combined text from parts: {content_api.as_text(all_parts)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "bwqWO5g-JeQt"
},
"source": [
"## 11. ⏩ Working with Streams and AsyncIterables\n",
"\n",
"Processors operate on `AsyncIterable` streams of `ProcessorPart` objects. The\n",
"`streams` module provides helpful functions for managing these streams.\n",
"\n",
"### Converting Iterables to AsyncIterables\n",
"\n",
"The `streams.stream_content` function converts a standard Python iterable (like\n",
"a list) into an `AsyncIterable`, which is necessary for processing with\n",
"Processors."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "yNjDb_fuJeQt"
},
"outputs": [],
"source": [
"import asyncio\n",
"from genai_processors import content_api, streams\n",
"\n",
"iterable_data = [\"Part 1\", \"Part 2\", \"Part 3\"]\n",
"async_stream = streams.stream_content(iterable_data)\n",
"\n",
"print(\"\\nProcessing stream:\")\n",
"async for part in async_stream:\n",
" print(f\"Received: {part}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "i9RwZQTVJeQt"
},
"source": [
"This is mostly used for tests to create an `AsyncIterable` easily from a list.\n",
"You can pass a `with_delay_sec` argument to `stream_content` to make sure the\n",
"items are not all yielded immediately.\n",
"\n",
"### Gathering a Stream into a List\n",
"\n",
"The `streams.gather_stream` function collects all items from an `AsyncIterable`\n",
"into a Python list. This is useful for consuming the entire output of a\n",
"processor when the stream is finite."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ldyNGoQlJeQt"
},
"outputs": [],
"source": [
"import asyncio\n",
"from genai_processors import content_api, streams\n",
"\n",
"async_stream = streams.stream_content(\n",
" [content_api.ProcessorPart(\"A\"), content_api.ProcessorPart(\"B\")]\n",
")\n",
"gathered_list = await streams.gather_stream(async_stream)\n",
"\n",
"print(\"\\nGathered list from stream:\")\n",
"print(gathered_list)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0wMDb92PJeQu"
},
"source": [
"### Splitting and Merging Streams\n",
"\n",
"The `streams` module provides functions for splitting a single stream into\n",
"multiple identical streams (`streams.split`) and merging multiple streams into a\n",
"single stream (`streams.merge` and `streams.concat`). These are powerful tools\n",
"for building complex processing graphs where different parts of the pipeline\n",
"need to operate on the same input or combine results from different sources."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xttgnHCMJeQu"
},
"outputs": [],
"source": [
"import asyncio\n",
"from genai_processors import content_api, processor, streams\n",
"\n",
"\n",
"@processor.processor_function\n",
"async def append_a(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" async for part in content:\n",
" yield content_api.ProcessorPart(part.text + \"A\")\n",
"\n",
"\n",
"@processor.processor_function\n",
"async def append_b(\n",
" content: AsyncIterable[content_api.ProcessorPart],\n",
") -\u003e AsyncIterable[content_api.ProcessorPart]:\n",
" async for part in content:\n",
" yield content_api.ProcessorPart(part.text + \"B\")\n",
"\n",
"\n",
"initial_stream = streams.stream_content(\n",
" [\"Start\", \"Finish\"],\n",
" # We add a delay after yielding each item. This lets the \"Start\" items be\n",
" # yielded first.\n",
" with_delay_sec=0.001,\n",
")\n",
"\n",
"# Split the stream into two\n",
"stream1, stream2 = streams.split(initial_stream, n=2)\n",
"\n",
"# Process each stream independently\n",
"processed_stream1 = append_a(stream1)\n",
"processed_stream2 = append_b(stream2)\n",
"\n",
"# Merge the processed streams\n",
"merged_stream = streams.merge([processed_stream1, processed_stream2])\n",
"\n",
"print(\"\\nSplit and Merge Example Output:\")\n",
"async for part in merged_stream:\n",
" print(part.text)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WV4u3I1WJeQu"
},
"source": [
"This example splits the initial stream, processes each branch (appending \"A\" to\n",
"one and \"B\" to the other), and then merges the results. The output order in the\n",
"merged stream might vary depending on task scheduling. We have set a delay\n",
"`with_delay_sec` which makes sure all `start` items are yielded first. If you\n",
"remove it, the scheduling will likely be different.\n",
"\n",
"You can create loops within streams using `merge` and `queues` as follows:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "wrVJvxXHJeQu"
},
"outputs": [],
"source": [
"input_stream = streams.stream_content(\n",
" [content_api.ProcessorPart(\"Hello\"), content_api.ProcessorPart(\"World\")],\n",
" # Adds a 0.1 second delay after streaming each part. This is needed in this\n",
" # example to insert the content of the input_queue into the stream before it\n",
" # is closed.\n",
" with_delay_sec=0.1,\n",
")\n",
"input_queue = asyncio.Queue()\n",
"stream_loop = streams.merge(\n",
" [input_stream, streams.dequeue(input_queue)],\n",
" stop_on_first=True,\n",
")\n",
"\n",
"\n",
"async def inject_new_part():\n",
" async for part in append_a(stream_loop):\n",
" print(part.text)\n",
" # Wait for 0.09 seconds to inject a new part before the next part is\n",
" # streamed.\n",
" await asyncio.sleep(0.09)\n",
" # Inject a \"new_part\" Part in the stream_loop.\n",
" await input_queue.put(content_api.ProcessorPart(\"new_part\"))\n",
"\n",
"\n",
"# This will output: HelloA, new_partA, WorldA\n",
"asyncio.run(inject_new_part())"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pFKQ0w1oJeQu"
},
"source": [
"Note that `new_part` has been processed by the `append_a` processor as it was\n",
"injected into the `stream_loop` via the `input_queue`. The `stop_on_first`\n",
"should be set to `True` here to stop when the `input_stream` is over, otherwise\n",
"the loop will fill itself with `new_part` infinitely as the `live_queue` is\n",
"never ended.\n",
"\n",
"This idiom lets you create complex pipelines where the output of a processor can\n",
"be processed and re-injected in the processor, a typical pattern with real-time\n",
"agents."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lTR9hD06TuXq"
},
"source": [
"### Create Streams from Processor Sources\n",
"\n",
"When the input stream comes from an external source like a microphone, a camera\n",
"or a network connection, the `@processor.source` decorator provides a\n",
"straightforward way to **originate** a stream of `ProcessorPart` objects and\n",
"easily create a \"source\" for your data. Think of it as a special kind of\n",
"`Processor` that **starts** a stream of `ProcessorPart` objects, rather than\n",
"just transforming an existing one.\n",
"\n",
"You define a source by writing an `async` generator function that `yields` the\n",
"`ProcessorPart`s you want to produce. For example, here's a source that reads\n",
"input from the terminal:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "W9g5WI2gW7Y0"
},
"outputs": [],
"source": [
"@processor.source\n",
"async def TerminalInput(\n",
" prompt: str,\n",
") -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n",
" while True:\n",
" input_text = await asyncio.to_thread(input, prompt)\n",
" if input_text == 'q':\n",
" break\n",
" yield input_text\n",
"\n",
"\n",
"async for part in TerminalInput('\u003e'):\n",
" print(part)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6A0wIqPmXHi1"
},
"source": [
"The `@processor.source` decorator automatically turns your generator into a\n",
"full-fledged `Processor`.\n",
"\n",
"This means:\n",
"\n",
"1. **It can be chained:** You can combine multiple sources (or sources with\n",
" regular processors) using the `+` operator. The parts from one source will\n",
" be merged with the parts generated by the next.\n",
"\n",
" ```python\n",
" # Imagine audio_io.AudioIn and live_model.LiveModel are other processors\n",
" # that produce or process data.\n",
" p = TerminalInput('\u003e') + audio_io.AudioIn(...) + live_model.LiveModel(...)\n",
" # Here, TerminalInput starts the stream, which is then combined with\n",
" # audio input, and finally fed into a live model.\n",
" # endless_stream() is an empty stream that never ends.\n",
" # It is often used when a source initiates the stream.\n",
" async for part in p(streams.endless_stream()):\n",
" # Process the combined output\n",
" pass\n",
" ```\n",
"\n",
" When chaining a source, its input stream (e.g., `streams.endless_stream()`)\n",
" primarily serves to initiate the source's operation. The source then adds\n",
" its generated parts to this stream, allowing subsequent processors in the\n",
" chain to receive data from both the initial stream and the source.\n",
"\n",
"2. **It can be used as an input stream:** Since a source itself acts as an\n",
" `AsyncIterable` of `ProcessorPart`s, you can directly pass it as the input\n",
" to any other `Processor`.\n",
"\n",
" ```python\n",
" # Here, TerminalInput generates parts, and live_model processes them.\n",
" async for part in live_model.LiveModel(...)(TerminalInput('\u003e')):\n",
" # Process parts from the live model\n",
" pass\n",
" ```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "l4adXghuTtMm"
},
"source": [
"## 12. ➡️ Next Steps\n",
"\n",
"This tutorial covered the basics of creating, applying, and chaining processors,\n",
"as well as working with different content types and GenAI models.\n",
"\n",
"You can move to the\n",
"[create_your_own_processor](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/create_your_own_processor.ipynb)\n",
"notebook to dive deeper into the development of new `Processors`."
]
}
],
"metadata": {
"colab": {
"last_runtime": {
"build_target": "",
"kind": "local"
},
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}