notebooks/create_your_own_processor.ipynb (696 lines of code) (raw):

{ "cells": [ { "metadata": { "id": "-xxvpd_HBfQz" }, "cell_type": "code", "source": [ "# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ], "outputs": [], "execution_count": null }, { "metadata": { "id": "am9RI7iLBgB4" }, "cell_type": "markdown", "source": [ "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/create_your_own_processor.ipynb)" ] }, { "metadata": { "id": "dwPxLtSyuSW-" }, "cell_type": "markdown", "source": [ "# Create your own Processor\n", "\n", "This section provides a step-by-step tutorial on how to create your own\n", "Processor.\n", "\n", "## 1. 🛠️ Setup\n", "\n", "First, install the GenAI Processors library:" ] }, { "metadata": { "id": "byEpq55KuSW-" }, "cell_type": "code", "source": [ "!pip install genai-processors" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "uAuyZN279p-N" }, "cell_type": "markdown", "source": [ "### API Key\n", "\n", "To use the GenAI model processors, you will need an API key. If you have not\n", "done so already, obtain your API key from Google AI Studio, and import it as a\n", "secret in Colab (recommended) or directly set it below." ] }, { "metadata": { "id": "IfIaVB7L9z1N" }, "cell_type": "code", "source": [ "from google.colab import userdata\n", "\n", "API_KEY = userdata.get('GOOGLE_API_KEY')" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "jfB7TK_JuSW_" }, "cell_type": "markdown", "source": [ "## 2. 🎯 Decide which type of processor to implement\n", "\n", "GenAI Processors have two main processor types:\n", "\n", "1. Standard `Processor`s that process the stream of `ProcessorPart`s in order.\n", "`Processor`s need to implement the following interface:\n", "\u003e ```python\n", "\u003e @abc.abstractmethod\n", "\u003e async def call(\n", "\u003e self, content: AsyncIterable[ProcessorPart]\n", "\u003e ) -\u003e AsyncIterable[ProcessorPartTypes]:\n", "\u003e ...\n", "\u003e ```\n", "2. `PartProcessor`s that process each `ProcessorPart` independently and concurrently. `PartProcessor`s need to implement a similar interface but with a single `ProcessorPart` as input argument:\n", "\u003e ```python\n", "\u003e @abc.abstractmethod\n", "\u003e async def call(\n", "\u003e self, content: ProcessorPart\n", "\u003e ) -\u003e AsyncIterable[ProcessorPartTypes]:\n", "\u003e ...\n", "\u003e ```\n", "\n", "Both processors yield `ProcessorPartTypes`, an encompassing type that comprises\n", "`strings`, `PIL.Image.Image`, `genai_part`, or `ProcessorPart`. When returning\n", "anything other than `genai_part` or `ProcessorPart`, the library automatically\n", "wraps the returned object in a `ProcessorPart`. It assumes the `USER` role and\n", "derives the type from the object itself (e.g., strings are inferred as text,\n", "`PIL.Image.Image` as image). Note that raw bytes cannot be returned directly by\n", "Processors or PartProcessors, as their mimetypes cannot be inferred. They\n", "need to be wrapped in a `ProcessorPart` instance with the appropriate mimetype.\n", "\n", "A PartProcessor can be turned into a Processor by using the `to_processor()`\n", "function. Under the hood, a PartProcessor will be applied to each item of\n", "`content: AsyncIterable[ProcessorPart]` concurrently, enabling efficient\n", "computation that can take much less time than a standard Processor\n", "implementation. It is therefore preferred to implement a PartProcessor\n", "whenever possible -- typically, when the order of computation across the items\n", "in the `content` stream is irrelevant.\n", "\n", "If the order matters (for example, in a Processor that buffers text from the\n", "input stream and checks for a regular expression), then you should implement a \n", "Processor.\n", "\n", "To see the difference of computation time between a Processor and PartProcessor,\n", "consider the following example. Note that it will takes about 20-30 seconds to\n", "run to collect time statistics." ] }, { "metadata": { "id": "SVQXcbFcuSW_" }, "cell_type": "code", "source": [ "import asyncio\n", "from typing import AsyncIterable\n", "from genai_processors import content_api\n", "from genai_processors import processor\n", "from genai_processors import streams\n", "import nest_asyncio\n", "\n", "nest_asyncio.apply() # Needed to run async loops in Colab\n", "\n", "\n", "@processor.processor_function\n", "async def upper_case_processor(\n", " content: AsyncIterable[content_api.ProcessorPart],\n", ") -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " async for part in content:\n", " if content_api.is_text(part.mimetype):\n", " yield part.text.upper()\n", " else:\n", " yield part\n", " # Sleep a bit to simulate more compute intensive task\n", " await asyncio.sleep(0.001)\n", "\n", "\n", "@processor.part_processor_function\n", "async def upper_case_part_processor(\n", " part: content_api.ProcessorPart,\n", ") -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " # The code below is the same block as the `async for` block in the function\n", " # above.\n", " if content_api.is_text(part.mimetype):\n", " yield part.text.upper()\n", " else:\n", " yield part\n", " # Sleep a bit to simulate more compute intensive task\n", " await asyncio.sleep(0.001)\n", "\n", "\n", "async def load_test(processor: processor.Processor):\n", " input_stream = streams.stream_content([\"hello\"] * 1000)\n", " async for _ in processor(input_stream):\n", " pass\n", "\n", "\n", "print(\"time with Processor:\")\n", "%timeit asyncio.run(load_test(upper_case_processor))\n", "print(\"time with PartProcessor:\")\n", "%timeit asyncio.run(load_test(upper_case_part_processor.to_processor()))" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "rI7vuspBxPMo" }, "cell_type": "markdown", "source": [ "Note that PartProcessors are rarely used directly as-is in applications; they\n", "are more commonly used as building blocks when decomposing a complex Processor\n", "into sub-processing units that can then combined with the `+` or the `//`\n", "operators (chaining and parallel). When used standalone, it is important to use\n", "the `to_processor()` method to convert them into standard Processors. If you\n", "forget this, you will likely get an exception.\n", "\n", "## 3. 🏗️ Implement a Class or a Function\n", "\n", "The examples above defined processors as free functions wrapped with\n", "`@processor.processor_function decorator`. If the Processor has parameters, it\n", "might be more convenient to define it as a class. To do that, extend the\n", "`processor.Processor` or `processor.PartProcessor` class and implement the call\n", "method:" ] }, { "metadata": { "id": "5wvVAsGux6NH" }, "cell_type": "code", "source": [ "from genai_processors import processor\n", "\n", "\n", "class PreambleProcessor(processor.Processor):\n", " \"\"\"Adds a preamble to the content.\"\"\"\n", "\n", " def __init__(self, preamble: content_api.ProcessorContent):\n", " self._preamble = preamble\n", "\n", " async def call(\n", " self,\n", " content: AsyncIterable[content_api.ProcessorPart],\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " for part in self._preamble:\n", " yield part\n", " async for part in content:\n", " yield part\n", "\n", "\n", "p = PreambleProcessor([\n", " \"Instruction manual: RP-60 is a rotary retro phone. To dial a number, put\",\n", " \" your finger in a hole opposite the desired digit and rotate the disk\",\n", " \" clockwise...\",\n", "])\n", "input_stream = streams.stream_content([\"Where are the buttons?\"])\n", "\n", "async for part in p(input_stream):\n", " print(part)" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "bC0in12DEMyv" }, "cell_type": "markdown", "source": [ "## 4. ⚙️ A note on state management\n", "\n", "When an internal state needs to be defined, it is best practice to manage it\n", "inside the `call()` method as done below:" ] }, { "metadata": { "id": "4hLwjYgAFH2J" }, "cell_type": "code", "source": [ "async def call(\n", " self,\n", " content: AsyncIterable[content_api.ProcessorPart],\n", ") -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " # define your state variables\n", " state = ...\n", "\n", " for part in self._preamble:\n", " # Update your state variable\n", " state.update()" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "Qo8J9LsPGqLH" }, "cell_type": "markdown", "source": [ "The same Processor can then be called with different input streams without\n", "creating any side effect on the state variable.\n", "\n", "If you need to create a state variable at the class level, a recommended\n", "practice is to raise an exception when the state variable is accessed twice in a\n", "`call()` method. This indicates that the state would span two runs of the\n", "Processor. See the example below." ] }, { "metadata": { "id": "AORATIAQGqLH" }, "cell_type": "code", "source": [ "class MyProcessor(processor.Processor):\n", "\n", " def __init__(self):\n", " self._queue: asyncio.Queue | None = None\n", "\n", " async def call(\n", " self,\n", " content: AsyncIterable[content_api.ProcessorPart],\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " if self._queue is not None:\n", " raise ValueError(\"My Processor can only be called once.\")\n", " self._queue = asyncio.Queue()\n", " try:\n", " ...\n", " async for part in content:\n", " ...\n", " finally:\n", " self._queue = None" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "oZqAQI-pGqLH" }, "cell_type": "markdown", "source": [ "This pattern ensures that the Processor can be called only on one input stream\n", "at a time, preventing unexpected side effects with shared states." ] }, { "metadata": { "id": "Xe6mzRJKyPN-" }, "cell_type": "markdown", "source": [ "## 5. ⚡ Creating tasks inside a Processor\n", "\n", "Sometimes Processors may need to create asyncio tasks to process data in\n", "parallel. The GenAI Processors library provides a specific\n", "`processor.create_task` method to manage tasks in a way compatible with\n", "generators and exception handling. We strongly recommend using this function to\n", "create tasks. This uses a context manager similar to TaskGroup under the hood,\n", "and manages task cancellations and exceptions properly." ] }, { "metadata": { "id": "5xpx6WXw0BiO" }, "cell_type": "code", "source": [ "from genai_processors import processor\n", "\n", "\n", "class TeaProcessor(processor.Processor):\n", "\n", " async def _wait_for_tea_to_brew(self):\n", " await asyncio.sleep(1)\n", " print(\"Your acme super-express tea is ready!\")\n", "\n", " async def call(\n", " self, content: AsyncIterable[content_api.ProcessorPart]\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " yield \"Please have a tea while we process your request\"\n", " tea_task = processor.create_task(self._wait_for_tea_to_brew())\n", " async for part in content:\n", " if content_api.is_text(part.mimetype):\n", " print(part.text)\n", " await tea_task\n", "\n", "\n", "input_stream = streams.stream_content(\n", " [\"Actually \", \" I wanted cofee.\"], with_delay_sec=0.6\n", ")\n", "async for _ in TeaProcessor()(input_stream):\n", " pass" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "2c2v6ZPK8mkl" }, "cell_type": "markdown", "source": [ "The body of the `async for` loop inside the `call()` method should never block.\n", "If there is a long running operation, it is highly recommended to wrap it into\n", "an asyncio task and to let asyncio switch to another task in the event loop\n", "whenever needed.\n", "\n", "This is done naturally when using `async def` functions executed with \n", "`processor.create_task`. Sometimes, however, the long-running operation is\n", "performed synchronously and is not an async function. In that case, use the\n", "`asyncio.to_thread()` method to ensure asyncio can switch to other tasks." ] }, { "metadata": { "id": "wGPiZIhm_hwc" }, "cell_type": "code", "source": [ "import time\n", "\n", "\n", "class TeaProcessor(processor.Processor):\n", "\n", " def _prepare_tea(self):\n", " # long running operation - sync mode.\n", " print(\"Brewing tea...\")\n", " time.sleep(1)\n", "\n", " async def _wait_for_tea_to_brew(self):\n", " # sync method with a long running operation\n", " await asyncio.to_thread(self._prepare_tea)\n", " print(\"Your acme super-express tea is ready!\")\n", "\n", " async def call(\n", " self, content: AsyncIterable[content_api.ProcessorPart]\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " yield \"Please have a tea while we process your request\"\n", " tea_task = processor.create_task(self._wait_for_tea_to_brew())\n", " async for part in content:\n", " if content_api.is_text(part.mimetype):\n", " print(part.text)\n", " await tea_task\n", "\n", "\n", "input_stream = streams.stream_content(\n", " [\"Actually \", \" I wanted cofee.\"], with_delay_sec=0.6\n", ")\n", "async for _ in TeaProcessor()(input_stream):\n", " pass" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "rbQK8QRj_hwc" }, "cell_type": "markdown", "source": [ "## 6. 🔗 Combining Processors together\n", "\n", "When building a Processor, you will likely see that the overall computation can\n", "be split into smaller ones, each corresponding to a separate Processor. Using\n", "the `+` operator, these computations can be combined into a chain where the next\n", "Processor receives the output of the previous one. If PartProcessors are\n", "involved, the runtime will be able to process incoming Parts in parallel while\n", "still preserving the order of the output.\n", "\n", "In addition, PartProcessors support the parallel operator `//`. Parallel\n", "PartProcessors will be run on the same input chunks, and thus don't need to wait\n", "for the previous Processor in a chain before yielding a Part. This is useful for\n", "units of computation that apply non-intersecting transformations, such as\n", "PartProcessors that process distinct document types (PDF, DOC, PPT) into \n", "representations that a model can understand. The output order of the \n", "ProcessorParts follows the order of the input chunks. If multiple Processors\n", "produce output parts for the same input part, their output order will follow the\n", "order of the Processors in the chain operation.\n", "\n", "We also provide some utilities for splitting and merging `ProcessorPart`\n", "streams: `processor.parallel_concat()` concatenates the output streams of\n", "several processors, and `streams.split` splits a stream in two or more streams,\n", "which can then be processed by several Processors in parallel.\n", "\n", "As an example, let's define a compound Processor that prefixes a preamble to the\n", "input, sends it to a model, and then converts the output to upper case:" ] }, { "metadata": { "id": "KRo8DcZm_hwc" }, "cell_type": "code", "source": [ "from genai_processors.core import genai_model\n", "from genai_processors.core import preamble\n", "from google.colab import userdata\n", "from google.genai import types as genai_types\n", "\n", "\n", "class UpperGenAI(processor.Processor):\n", "\n", " def __init__(self):\n", " self._preamble = preamble.Preamble(content=['what is the definition of: '])\n", " self._model = genai_model.GenaiModel(\n", " # Use your API KEY here\n", " api_key=userdata.get('GOOGLE_API_KEY'),\n", " model_name='gemini-2.0-flash',\n", " generate_content_config=genai_types.GenerateContentConfig(\n", " temperature=0.7\n", " ),\n", " )\n", " self._post_processing = upper_case_processor\n", "\n", " async def call(\n", " self, content: AsyncIterable[content_api.ProcessorPart]\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " p = self._preamble + self._model + self._post_processing\n", " async for part in p(content):\n", " yield part\n", "\n", "\n", "input_stream = streams.stream_content(['processor'])\n", "async for part in UpperGenAI()(input_stream):\n", " print(part.text)" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "ywEWGeXL_hwc" }, "cell_type": "markdown", "source": [ "Processors can be combined in chains with PartProcessors, and the translation\n", "between the two Processor types is handled by the library. When the chain\n", "contains only PartProcessors, the resulting chain remains a PartProcessor, with\n", "the same efficiency benefits described above. It is therefore recommended to\n", "chain PartProcessors together without including Processors in the middle.\n", "\n", "## 7. 🚧 Debugging and Testing\n", "\n", "Once your Processor is written, you will likely need to inspect what's going on.\n", "The `debug` library offers a couple of logging processors that capture the\n", "`ProcessorPart`s in your pipeline. The code below demonstrates the use of the\n", "`debug.print_stream()` Processor to print the content of a stream between two\n", "Processors. `print_stream` prints each ProcessorPart without modifying it,\n", "before yielding it again. Outside of a Colab, you can also use\n", "`debug.log_stream()`, which does the same but uses logging instead of print\n", "statements." ] }, { "metadata": { "id": "LcNE-D_i_hwc" }, "cell_type": "code", "source": [ "from genai_processors import debug\n", "\n", "\n", "class UpperGenAIWithLogs(processor.Processor):\n", "\n", " def __init__(self):\n", " self._preamble = preamble.Preamble(\n", " content=['In two sentences, what is the definition of: ']\n", " )\n", " self._model = genai_model.GenaiModel(\n", " api_key=API_KEY,\n", " model_name='gemini-2.0-flash',\n", " generate_content_config=genai_types.GenerateContentConfig(\n", " temperature=0.7\n", " ),\n", " )\n", " self._post_processing = upper_case_processor\n", "\n", " async def call(\n", " self, content: AsyncIterable[content_api.ProcessorPart]\n", " ) -\u003e AsyncIterable[content_api.ProcessorPartTypes]:\n", " p = (\n", " self._preamble\n", " # Intercept any ProcessorPart in this chain and prints it.\n", " # The input arg is a label indicating where the log is captured.\n", " + debug.print_stream('Before Model')\n", " + self._model\n", " # Intercept any ProcessorPart in this chain and prints it.\n", " # The input arg is a label indicating where the log is captured.\n", " + debug.print_stream('After Model')\n", " + self._post_processing\n", " )\n", " async for part in p(content):\n", " yield part\n", "\n", "\n", "input_stream = streams.stream_content(['processor'])\n", "async for part in UpperGenAIWithLogs()(input_stream):\n", " print(part.text)" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "hg80GLtC_hwc" }, "cell_type": "markdown", "source": [ "Note how the `ProcessorParts` are intertwined in the output. This is typical of\n", "a bidi-streaming situation, where the last Processor, `self._post_processing`,\n", "processes each `ProcessorPart` eagerly and produces output while the input\n", "stream is not fully consumed. When using this debugging method, it can be\n", "helpful to add labels like \"After Model\" or \"Before Model\" and to filter the\n", "logs with them.\n", "\n", "Testing your Processor can be done easily by using an `IsolatedAsyncioTestCase`\n", "and using a standard `async for` loop to collect the result:\n", "\n", "```python\n", "class TestUpperCaseProcessor(unittest.IsolatedAsyncioTestCase):\n", "\n", " async def test_to_upper_case_ok(self):\n", " expected = \"HELLO WORLD!\"\n", " input_stream = streams.stream_content([\"hello \", \"world!\"])\n", " actual = content_api.ProcessorContent()\n", " async for part in upper_case_processor(input_stream):\n", " actual += part\n", " # Only collect the processor output from the default substream to filter out\n", " # any status or debug statements.\n", " self.assertEqual(actual.as_text(substream_name=\"\"), expected)\n", "```\n", "\n", "The test can also be written in a sync mode using the `processor.apply_sync`\n", "method:" ] }, { "metadata": { "id": "gp7WtOTP_hwc" }, "cell_type": "code", "source": [ "import unittest\n", "\n", "\n", "class TestUpperCaseProcessor(unittest.TestCase):\n", "\n", " def test_to_upper_case_ok(self):\n", " expected = \"HELLO WORLD!\"\n", " actual = content_api.ProcessorContent(\n", " processor.apply_sync(upper_case_processor, [\"hello \", \"world!\"])\n", " )\n", " self.assertEqual(actual.as_text(substream_name=\"\"), expected)\n", "\n", "\n", "if __name__ == \"__main__\":\n", " unittest.main(argv=[\"first-arg-is-ignored\"], exit=False)" ], "outputs": [], "execution_count": null }, { "metadata": { "id": "S5RJMH_x4yfJ" }, "cell_type": "markdown", "source": [ "## 8. ➡️ Next Steps\n", "\n", "This tutorial covered the creation of `Processors` and `PartProcessors` and how\n", "to pick the right class based on your use-case.\n", "\n", "Check the\n", "[live processor intro](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/live_processor_intro.ipynb)\n", "notebook to dive deeper into creating realtime processor using the Live API." ] } ], "metadata": { "colab": { "last_runtime": { "build_target": "", "kind": "local" }, "private_outputs": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }