In [None]:
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/create_your_own_processor.ipynb)

# Create your own Processor

This section provides a step-by-step tutorial on how to create your own
Processor.

## 1. üõ†Ô∏è Setup

First, install the GenAI Processors library:

In [None]:
!pip install genai-processors

### API Key

To use the GenAI model processors, you will need an API key. If you have not
done so already, obtain your API key from Google AI Studio, and import it as a
secret in Colab (recommended) or directly set it below.

In [None]:
from google.colab import userdata

API_KEY = userdata.get('GOOGLE_API_KEY')

## 2. üéØ Decide which type of processor to implement

GenAI Processors have two main processor types:

1. Standard `Processor`s that process the stream of `ProcessorPart`s in order.
`Processor`s need to implement the following interface:
>   ```python
>     @abc.abstractmethod
>     async def call(
>          self, content: AsyncIterable[ProcessorPart]
>     ) -> AsyncIterable[ProcessorPartTypes]:
>     ...
>   ```
2. `PartProcessor`s that process each `ProcessorPart` independently and concurrently. `PartProcessor`s need to implement a similar interface but with a single `ProcessorPart` as input argument:
>   ```python
>     @abc.abstractmethod
>     async def call(
>          self, content: ProcessorPart
>     ) -> AsyncIterable[ProcessorPartTypes]:
>     ...
>   ```

Both processors yield `ProcessorPartTypes`, an encompassing type that comprises
`strings`, `PIL.Image.Image`, `genai_part`, or `ProcessorPart`. When returning
anything other than `genai_part` or `ProcessorPart`, the library automatically
wraps the returned object in a `ProcessorPart`. It assumes the `USER` role and
derives the type from the object itself (e.g., strings are inferred as text,
`PIL.Image.Image` as image). Note that raw bytes cannot be returned directly by
Processors or PartProcessors, as their mimetypes cannot be inferred. They
need to be wrapped in a `ProcessorPart` instance with the appropriate mimetype.

A PartProcessor can be turned into a Processor by using the `to_processor()`
function. Under the hood, a PartProcessor will be applied to each item of
`content: AsyncIterable[ProcessorPart]` concurrently, enabling efficient
computation that can take much less time than a standard Processor
implementation. It is therefore preferred to implement a PartProcessor
whenever possible -- typically, when the order of computation across the items
in the `content` stream is irrelevant.

If the order matters (for example, in a Processor that buffers text from the
input stream and checks for a regular expression), then you should implement a 
Processor.

To see the difference of computation time between a Processor and PartProcessor,
consider the following example. Note that it will takes about 20-30 seconds to
run to collect time statistics.

In [None]:
import asyncio
from typing import AsyncIterable
from genai_processors import content_api
from genai_processors import processor
from genai_processors import streams
import nest_asyncio

nest_asyncio.apply()  # Needed to run async loops in Colab


@processor.processor_function
async def upper_case_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPartTypes]:
  async for part in content:
    if content_api.is_text(part.mimetype):
      yield part.text.upper()
    else:
      yield part
    # Sleep a bit to simulate more compute intensive task
    await asyncio.sleep(0.001)


@processor.part_processor_function
async def upper_case_part_processor(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPartTypes]:
  # The code below is the same block as the `async for` block in the function
  # above.
  if content_api.is_text(part.mimetype):
    yield part.text.upper()
  else:
    yield part
  # Sleep a bit to simulate more compute intensive task
  await asyncio.sleep(0.001)


async def load_test(processor: processor.Processor):
  input_stream = streams.stream_content(["hello"] * 1000)
  async for _ in processor(input_stream):
    pass


print("time with Processor:")
%timeit asyncio.run(load_test(upper_case_processor))
print("time with PartProcessor:")
%timeit asyncio.run(load_test(upper_case_part_processor.to_processor()))

Note that PartProcessors are rarely used directly as-is in applications; they
are more commonly used as building blocks when decomposing a complex Processor
into sub-processing units that can then combined with the `+` or the `//`
operators (chaining and parallel). When used standalone, it is important to use
the `to_processor()` method to convert them into standard Processors. If you
forget this, you will likely get an exception.

## 3. üèóÔ∏è Implement a Class or a Function

The examples above defined processors as free functions wrapped with
`@processor.processor_function decorator`. If the Processor has parameters, it
might be more convenient to define it as a class. To do that, extend the
`processor.Processor` or `processor.PartProcessor` class and implement the call
method:

In [None]:
from genai_processors import processor


class PreambleProcessor(processor.Processor):
  """Adds a preamble to the content."""

  def __init__(self, preamble: content_api.ProcessorContent):
    self._preamble = preamble

  async def call(
      self,
      content: AsyncIterable[content_api.ProcessorPart],
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    for part in self._preamble:
      yield part
    async for part in content:
      yield part


p = PreambleProcessor([
    "Instruction manual: RP-60 is a rotary retro phone. To dial a number, put",
    " your finger in a hole opposite the desired digit and rotate the disk",
    " clockwise...",
])
input_stream = streams.stream_content(["Where are the buttons?"])

async for part in p(input_stream):
  print(part)

## 4. ‚öôÔ∏è A note on state management

When an internal state needs to be defined, it is best practice to manage it
inside the `call()` method as done below:

In [None]:
async def call(
    self,
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPartTypes]:
  # define your state variables
  state = ...

  for part in self._preamble:
    # Update your state variable
    state.update()

The same Processor can then be called with different input streams without
creating any side effect on the state variable.

If you need to create a state variable at the class level, a recommended
practice is to raise an exception when the state variable is accessed twice in a
`call()` method. This indicates that the state would span two runs of the
Processor. See the example below.

In [None]:
class MyProcessor(processor.Processor):

  def __init__(self):
    self._queue: asyncio.Queue | None = None

  async def call(
      self,
      content: AsyncIterable[content_api.ProcessorPart],
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    if self._queue is not None:
      raise ValueError("My Processor can only be called once.")
    self._queue = asyncio.Queue()
    try:
      ...
      async for part in content:
        ...
    finally:
      self._queue = None

This pattern ensures that the Processor can be called only on one input stream
at a time, preventing unexpected side effects with shared states.

## 5. ‚ö° Creating tasks inside a Processor

Sometimes Processors may need to create asyncio tasks to process data in
parallel. The GenAI Processors library provides a specific
`processor.create_task` method to manage tasks in a way compatible with
generators and exception handling. We strongly recommend using this function to
create tasks. This uses a context manager similar to TaskGroup under the hood,
and manages task cancellations and exceptions properly.

In [None]:
from genai_processors import processor


class TeaProcessor(processor.Processor):

  async def _wait_for_tea_to_brew(self):
    await asyncio.sleep(1)
    print("Your acme super-express tea is ready!")

  async def call(
      self, content: AsyncIterable[content_api.ProcessorPart]
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    yield "Please have a tea while we process your request"
    tea_task = processor.create_task(self._wait_for_tea_to_brew())
    async for part in content:
      if content_api.is_text(part.mimetype):
        print(part.text)
    await tea_task


input_stream = streams.stream_content(
    ["Actually ", " I wanted cofee."], with_delay_sec=0.6
)
async for _ in TeaProcessor()(input_stream):
  pass

The body of the `async for` loop inside the `call()` method should never block.
If there is a long running operation, it is highly recommended to wrap it into
an asyncio task and to let asyncio switch to another task in the event loop
whenever needed.

This is done naturally when using `async def` functions executed with 
`processor.create_task`. Sometimes, however, the long-running operation is
performed synchronously and is not an async function. In that case, use the
`asyncio.to_thread()` method to ensure asyncio can switch to other tasks.

In [None]:
import time


class TeaProcessor(processor.Processor):

  def _prepare_tea(self):
    # long running operation - sync mode.
    print("Brewing tea...")
    time.sleep(1)

  async def _wait_for_tea_to_brew(self):
    # sync method with a long running operation
    await asyncio.to_thread(self._prepare_tea)
    print("Your acme super-express tea is ready!")

  async def call(
      self, content: AsyncIterable[content_api.ProcessorPart]
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    yield "Please have a tea while we process your request"
    tea_task = processor.create_task(self._wait_for_tea_to_brew())
    async for part in content:
      if content_api.is_text(part.mimetype):
        print(part.text)
    await tea_task


input_stream = streams.stream_content(
    ["Actually ", " I wanted cofee."], with_delay_sec=0.6
)
async for _ in TeaProcessor()(input_stream):
  pass

## 6. üîó Combining Processors together

When building a Processor, you will likely see that the overall computation can
be split into smaller ones, each corresponding to a separate Processor. Using
the `+` operator, these computations can be combined into a chain where the next
Processor receives the output of the previous one. If PartProcessors are
involved, the runtime will be able to process incoming Parts in parallel while
still preserving the order of the output.

In addition, PartProcessors support the parallel operator `//`. Parallel
PartProcessors will be run on the same input chunks, and thus don't need to wait
for the previous Processor in a chain before yielding a Part. This is useful for
units of computation that apply non-intersecting transformations, such as
PartProcessors that process distinct document types (PDF, DOC, PPT) into 
representations that a model can understand. The output order of the 
ProcessorParts follows the order of the input chunks. If multiple Processors
produce output parts for the same input part, their output order will follow the
order of the Processors in the chain operation.

We also provide some utilities for splitting and merging `ProcessorPart`
streams: `processor.parallel_concat()` concatenates the output streams of
several processors, and `streams.split` splits a stream in two or more streams,
which can then be processed by several Processors in parallel.

As an example, let's define a compound Processor that prefixes a preamble to the
input, sends it to a model, and then converts the output to upper case:

In [None]:
from genai_processors.core import genai_model
from genai_processors.core import preamble
from google.colab import userdata
from google.genai import types as genai_types


class UpperGenAI(processor.Processor):

  def __init__(self):
    self._preamble = preamble.Preamble(content=['what is the definition of: '])
    self._model = genai_model.GenaiModel(
        # Use your API KEY here
        api_key=userdata.get('GOOGLE_API_KEY'),
        model_name='gemini-2.0-flash',
        generate_content_config=genai_types.GenerateContentConfig(
            temperature=0.7
        ),
    )
    self._post_processing = upper_case_processor

  async def call(
      self, content: AsyncIterable[content_api.ProcessorPart]
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    p = self._preamble + self._model + self._post_processing
    async for part in p(content):
      yield part


input_stream = streams.stream_content(['processor'])
async for part in UpperGenAI()(input_stream):
  print(part.text)

Processors can be combined in chains with PartProcessors, and the translation
between the two Processor types is handled by the library. When the chain
contains only PartProcessors, the resulting chain remains a PartProcessor, with
the same efficiency benefits described above. It is therefore recommended to
chain PartProcessors together without including Processors in the middle.

## 7. üöß Debugging and Testing

Once your Processor is written, you will likely need to inspect what's going on.
The `debug` library offers a couple of logging processors that capture the
`ProcessorPart`s in your pipeline. The code below demonstrates the use of the
`debug.print_stream()` Processor to print the content of a stream between two
Processors. `print_stream` prints each ProcessorPart without modifying it,
before yielding it again. Outside of a Colab, you can also use
`debug.log_stream()`, which does the same but uses logging instead of print
statements.

In [None]:
from genai_processors import debug


class UpperGenAIWithLogs(processor.Processor):

  def __init__(self):
    self._preamble = preamble.Preamble(
        content=['In two sentences, what is the definition of: ']
    )
    self._model = genai_model.GenaiModel(
        api_key=API_KEY,
        model_name='gemini-2.0-flash',
        generate_content_config=genai_types.GenerateContentConfig(
            temperature=0.7
        ),
    )
    self._post_processing = upper_case_processor

  async def call(
      self, content: AsyncIterable[content_api.ProcessorPart]
  ) -> AsyncIterable[content_api.ProcessorPartTypes]:
    p = (
        self._preamble
        # Intercept any ProcessorPart in this chain and prints it.
        # The input arg is a label indicating where the log is captured.
        + debug.print_stream('Before Model')
        + self._model
        # Intercept any ProcessorPart in this chain and prints it.
        # The input arg is a label indicating where the log is captured.
        + debug.print_stream('After Model')
        + self._post_processing
    )
    async for part in p(content):
      yield part


input_stream = streams.stream_content(['processor'])
async for part in UpperGenAIWithLogs()(input_stream):
  print(part.text)

Note how the `ProcessorParts` are intertwined in the output. This is typical of
a bidi-streaming situation, where the last Processor, `self._post_processing`,
processes each `ProcessorPart` eagerly and produces output while the input
stream is not fully consumed. When using this debugging method, it can be
helpful to add labels like "After Model" or "Before Model" and to filter the
logs with them.

Testing your Processor can be done easily by using an `IsolatedAsyncioTestCase`
and using a standard `async for` loop to collect the result:

```python
class TestUpperCaseProcessor(unittest.IsolatedAsyncioTestCase):

  async def test_to_upper_case_ok(self):
    expected = "HELLO WORLD!"
    input_stream = streams.stream_content(["hello ", "world!"])
    actual = content_api.ProcessorContent()
    async for part in upper_case_processor(input_stream):
      actual += part
    # Only collect the processor output from the default substream to filter out
    # any status or debug statements.
    self.assertEqual(actual.as_text(substream_name=""), expected)
```

The test can also be written in a sync mode using the `processor.apply_sync`
method:

In [None]:
import unittest


class TestUpperCaseProcessor(unittest.TestCase):

  def test_to_upper_case_ok(self):
    expected = "HELLO WORLD!"
    actual = content_api.ProcessorContent(
        processor.apply_sync(upper_case_processor, ["hello ", "world!"])
    )
    self.assertEqual(actual.as_text(substream_name=""), expected)


if __name__ == "__main__":
  unittest.main(argv=["first-arg-is-ignored"], exit=False)

## 8.  ‚û°Ô∏è Next Steps

This tutorial covered the creation of `Processors` and `PartProcessors` and how
to pick the right class based on your use-case.

Check the
[live processor intro](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/live_processor_intro.ipynb)
notebook to dive deeper into creating realtime processor using the Live API.