In [None]:
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/processor_intro.ipynb)

# Getting Started

Step-by-step tutorial on how to get started with the Genai Processors library.

## 1. üõ†Ô∏è Setup

First, install the GenAI Processors library:

In [None]:
!pip install genai-processors

### API Key

To use the GenAI model processors, you will need an API key. If you have not
done so already, obtain your API key from Google AI Studio, and import it as a
secret in Colab (recommended) or directly set it below. You can still run this
tutorial if you do not have an API key, but will need to skip the `Using GenAI
Models as Processors` section.

In [None]:
from google.colab import userdata

API_KEY = userdata.get('GOOGLE_API_KEY')

## 2. üí° Understanding Core Concepts

The Genai Processors library revolves around three main concepts:

*   **`ProcessorPart`:** The standard data object used by Processors. It
    represents a single piece of content **of a given modality**, such as text,
    an image, or structured data. Each `ProcessorPart` can have metadata
    attached, such as `mimetype` or `substream_name`, to categorize and route
    the content.
*   **`Processor`:** A processing unit that takes an asynchronous stream
    (AsyncIterable) of `ProcessorPart` objects as input and returns an
    asynchronous stream of `ProcessorPart` objects as output. Processors can be
    chained together to form complex pipelines.
*   **`PartProcessor`:** A specialized Processor for the case when parts in the
    stream can be processed independently. A PartProcessor takes a single
    `ProcessorPart` and returns an asynchronous stream of `ProcessorPart`. The
    library takes care of invoking the PartProcessors concurrently for each
    `ProcessorPart` in the incoming stream and assembling the output in the
    correct order. This allows for efficient concurrent processing of
    independent parts, avoiding delays when multiple PartProcessors are used in
    sequence.

**NOTE**: It's easy to confuse `ProcessorPart` and `PartProcessor` as they have
similar names but refer to different concepts:

> *   `ProcessorPart` is a **data object** representing a single piece of
>     content.
> *   `PartProcessor` is a **Processor** designed to operate on individual
>     `ProcessorPart`s.

## 3. üî® Creating a Simple Processor

Let's create a simple processor that replaces any `.` character with an `EoS`
tag. This is done below using a `@processor.processor_function` decorator that
converts an asynchronous generator function into a `Processor` object. This is a
convenience method for creating a Processor from a single function.

In [None]:
from collections.abc import AsyncIterable
from genai_processors import content_api, processor


@processor.processor_function
async def simple_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Replaces dots with '[EoS]'."""
  async for part in content:
    if content_api.is_text(part.mimetype):
      yield content_api.ProcessorPart(part.text.replace(".", "[EoS]"))
    else:
      yield part

Processors can also be defined by inheriting from the `processor.Processor`
class and implementing the `call(..)` method. This is the recommended method if
the Processor requires a persistent state or is parameterized.

In [None]:
from genai_processors.core import preamble


class SimpleTextProcessor(processor.Processor):

  def __init__(self, eos_string: str):
    self._eos = eos_string
    # Preamble adds a prefix to a content stream.
    self._preamble = preamble.Preamble("Starting. ")

  async def call(
      self,
      content: AsyncIterable[content_api.ProcessorPart],
  ) -> AsyncIterable[content_api.ProcessorPart]:
    """Replaces dots with '[EoS]'."""
    async for part in self._preamble(content):
      if content_api.is_text(part.mimetype):
        yield content_api.ProcessorPart(part.text.replace(".", self._eos))
      else:
        yield part

## 4. ‚ñ∂Ô∏è Applying a Processor

You can apply a Processor to an input stream by iterating over the Processor
directly with `async for` for asynchronous execution. This is the recommended
way.

In this notebook, we often use the utility `streams.stream_content` to create an
input stream from a list of `ProcessorPartTypes`, which includes
`ProcessorParts` but also generic types like `str` or `bytes`. We will see later
in the *Working with Streams and AsyncIterables* section how to create input
streams in more advanced setups.

### Asynchronous Application [recommended]

In [None]:
import asyncio
from genai_processors import streams

input_parts = ["Hello", "World"]
input_stream = streams.stream_content(input_parts)

print("\nAsynchronous Output:")
async for part in simple_text_processor(input_stream):
  print(part.text)

Note that `simple_text_processor` is a Processor instance. If we define it as a
class, it needs to be instantiated before use, so this code:

```python
async for part in simple_text_processor(stream):
  ...
```

would have to be replaced by:

```python
p = SimpleTextProcessor("[EoS]")
async for part in p(stream):
  ...
```

### Synchronous Application

For synchronous execution, you can apply a processor to a list of
`ProcessorPart` objects using `processor.apply_sync`.

In [None]:
import nest_asyncio

nest_asyncio.apply()  # Needed to run async loops in Colab

processed_parts_sync = processor.apply_sync(simple_text_processor, input_parts)

print("Synchronous Output:")
for part in processed_parts_sync:
  print(part.text)

## 5. ‚õìÔ∏è Chaining Processors

The real power of the library comes from chaining processors together using the
`+` operator.

In [None]:
@processor.processor_function
async def another_text_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Lowercases everything."""
  async for part in content:
    if content_api.is_text(part.mimetype):
      yield content_api.ProcessorPart(part.text.lower())
    else:
      yield part


chained_processor = simple_text_processor + another_text_processor
input_streams = streams.stream_content(["First. Second."])

print("\nChained Processor Output:")
async for part in chained_processor(input_streams):
  print(part.text)

This will output: `first[eos] second[eos]`

The `+` operator takes care of combining `Processors` and `PartProcessors`
correctly. However, grouping all `PartProcessors` together as much as possible
will maximize efficiency.

Chaining behaves differently for `ProcessorPart`s in the special `debug` and
`status` substreams. Parts in those substreams are returned to the caller as
soon as they are generated, and will not be passed to the next Processor in the
chain.

In [None]:
@processor.processor_function
async def simple_text_processor_with_status(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Replaces dots with '[EoS]'."""
  async for part in content:
    if content_api.is_text(part.mimetype):
      yield content_api.ProcessorPart(part.text.replace(".", "[EoS]"))
      yield processor.status(f"Simple processor done on {part.text}")
    else:
      yield part


chained_processor = simple_text_processor_with_status + another_text_processor
input_streams = streams.stream_content(["First.", "Second."])

print("\nChained Processor Output:")
async for part in chained_processor(input_streams):
  print(part)

Note that the `ProcessorPart`s in the `status` substream are not processed by
`another_text_processor` that lowercases everything.

## 6. üõ£Ô∏è Parallel and Switch for Processors

Processors can be run in parallel using the `parallel_concat()` function that
takes a sequence of processors as argument:

In [None]:
input_stream = streams.stream_content(["First.", "Second."])

p = [another_text_processor, simple_text_processor_with_status]

p = processor.parallel_concat(p)

print("\nParallel Processor Output:")
async for part in p(input_stream):
  print(part)

The output (without status Parts) is `first., second., First[EoS], Second[EoS]`,
which is the output of `another_text_processor` followed by the output of
`simple_text_processor_with_status`: it follows the order of the processor list
passed as argument to `parallel_concat`.

**WARNING**: Unhandled parts that are passed through by more than one processor
are repeated in the output.

If you need to output interleaved (not concatenated) results, consider using the
`PartProcessor` variant described in the "Parallel Execution of PartProcessor"
section.

Such parallel operation is not exclusive: one part is processed across several
processors at the same time and in parallel. When the processors need to process
different parts of the input stream in an exclusive way, we can rely on a switch
operatory.

In [None]:
from genai_processors import switch

input_stream = streams.stream_content([
    content_api.ProcessorPart("a1", substream_name="a"),
    content_api.ProcessorPart("b1", substream_name="b"),
    content_api.ProcessorPart("a2", substream_name="a"),
    content_api.ProcessorPart("b2", substream_name="b"),
    content_api.ProcessorPart("b3", substream_name="b"),
])

m = (
    switch.Switch(content_api.get_substream_name)
    .case("a", another_text_processor)
    .case("b", simple_text_processor)
    .default(processor.passthrough())
)

print("\nSwitch Processor Output:")
async for part in m(input_stream):
  print(part)

`match` is a switch processor that does not run in parallel but routes the input
parts based on the conditions defined in the case statements. The first case
condition that is matched defines which processor is used on the part. If no
case is matched, the default option is run. In this example, we pass the part as
is. Leaving default out means that no part is returned when no case is matched.

**TIP**: switch processors can be used to define branching conditions based on
parts in processor flows.

## 7. ü§ñ Using GenAI Models as Processors

The library provides built-in processors for interacting with Google's
Generative AI models. If you have not provided an API Key above, you can skip
this section.

In [None]:
from genai_processors.core import genai_model
from google.genai import types as genai_types

# Initialize the GenAI model processor
# Replace 'gemini-2.0-flash' with your desired model name
genai_processor = genai_model.GenaiModel(
    api_key=API_KEY,
    model_name="gemini-2.0-flash",
    generate_content_config=genai_types.GenerateContentConfig(temperature=0.7),
)

# Chain the GenAI processor with a processor to lowercase all inputs.
genai_pipeline = another_text_processor + genai_processor

input_prompt_genai = [
    "Explain the Concept of LARGE LANGUAGE MODELS",
    "in two sentences",
]
input_stream_genai = streams.stream_content(input_prompt_genai)

print("\nGenAI Pipeline Output:")
async for part in genai_pipeline(input_stream_genai):
  print(part.text)

The GenAI Processor library provides a `TTFTSingleStream` Processor to record
the Time-To-First-Token (TTFT) on any unidirectional streaming Processor. It
wraps the input Processor, keeping its original logic, and records the time from
invocation to the first output. This TTFT processor can only be used with GenAI
models that are not bidirectional (it cannot be applied to a LiveProcessor).

In [None]:
from genai_processors import debug

# Chain the GenAI processor with a processor to lowercase all inputs.
genai_pipeline = (
    another_text_processor
    # Add a tag "GenAI Model" to which processor the TTFT applies to
    + debug.TTFTSingleStream("GenAI Model", genai_processor)
)

input_prompt_genai = [
    "Explain the Concept of LARGE LANGUAGE MODELS",
    "in two sentences",
]
input_stream_genai = streams.stream_content(input_prompt_genai)

print("\nGenAI Pipeline Output:")
async for part in genai_pipeline(input_stream_genai):
  print(part.text)

The `GenAI Model TTFT=x.xx seconds` appears before the processor output and is
returned in the `status` substream.

Processors can also be used to easily connect to the Live API, as demonstrated
in the following example:

In [None]:
from genai_processors.core import live_model
from google.genai import types as genai_types
from IPython.display import Audio, display
import numpy as np

LIVE_MODEL_NAME = "gemini-2.0-flash-live-001"

live_processor = live_model.LiveProcessor(
    api_key=API_KEY,
    model_name=LIVE_MODEL_NAME,
    realtime_config=genai_types.LiveConnectConfig(
        # Basic configuration for real-time text and audio interaction
        output_audio_transcription={},  # Enable transcription of audio output
        realtime_input_config=genai_types.RealtimeInputConfig(
            turn_coverage=(  # Model sees all real-time input in a turn
                "TURN_INCLUDES_ALL_INPUT"
            )
        ),
        response_modalities=["AUDIO"],  # Request audio output
    ),
)


@processor.processor_function
async def collect_audio(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Yields a single Part containing all the audio from `content`."""
  audio_bytes = b""
  async for part in content:
    if content_api.is_audio(part.mimetype):
      audio_bytes += part.bytes
    elif content_api.is_text(part.mimetype):
      print(part)
  # This is yielded when the input stream is closed.
  yield content_api.ProcessorPart(
      audio_bytes,
      mimetype="audio/l16;rate=24000",
  )


# We only add text here, but this can contain audio, images, etc. This would
# typically come from a camera, microphone, or other input source.
input_stream = streams.stream_content(
    [
        content_api.ProcessorPart(
            "How are you today?", substream_name="realtime"
        )
    ],
    # This is needed for this example only: we wait here to give enough time
    # for the model to generate audio before we close the stream.
    with_delay_sec=7,
)
print("\nLive Processor Output:")
p = live_processor + collect_audio
async for part in p(input_stream):
  audio_track = Audio(
      data=np.frombuffer(part.bytes, dtype=np.int16),
      rate=24000,
      autoplay=True,
  )
  display(audio_track)

## 8. üß© Working with PartProcessors

For operations that apply to individual `ProcessorPart` objects independently,
you can use the `PartProcessor` class, or the
`@processor.part_processor_function` decorator. A `PartProcessor` can be cast
into a `Processor` with the `to_processor()` method. The resulting Processor
runs on all Parts in the input stream concurrently while preserving the order of
the parts. When chained with another `Processor`, a `PartProcessor` is
implicitly cast into a `Processor`. The use of `to_processor()` is therefore not
always needed. But if you want to apply a `PartProcessor` to an
`AsyncIterable[content_api.ProcessorPart]`, you need to run this method.

When you define a `PartProcessor` you can also add a `match` function that
defines what `Part` type this processor handles. While optional, it is
recommended to specify them. It is used to optimize how the GenAI Processor
library schedules asyncio tasks.

A match function has the following signature:

```python
def match(part: content_api.ProcessorPart) -> bool:
  """Returns False if `part` is irrelevant for the processor, True otherwise."""
  ...
```

The default implementation returns `True`, i.e. it assumes all parts should be
considered by the processor. It is OK to return `True` even if the part is not
relevant, the part will be processed and ignored. On the other hand, it is
important to be correct when returning `False`: any part where `match` returns
`False` will not be processed at all.

The `match` default implementation can be overridden in the `PartProcessor`
class or an ad-hoc `match` function can be provided as an extra parameter in the
`@processor.part_processor_function` decorator as shown below.

In [None]:
def match_text(part: content_api.ProcessorPart) -> bool:
  return content_api.is_text(part.mimetype)


@processor.part_processor_function(match_fn=match_text)
async def duplicate_part(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
  """Duplicates the input part."""
  yield part
  yield part


input_parts_duplicate = streams.stream_content(["A", "B"])

# To apply `duplicate_part` on the input *stream*, we need a Processor.
p = duplicate_part.to_processor()

print("\nPart Processor Output:")
async for part in p(input_parts_duplicate):
  print(part.text)

This will output: `A`, `A`, `B`, `B`.

The library offers a convenient way to create filters as `PartProcessors` using
the `create_filter` method that takes a `Callable[[ProcessorPart], bool]` as
input:

```python
# Creates a PartProcessor that only outputs the text parts. All other parts
# are dropped.
p = processor.create_filter(content_api.is_text)
```

### A Note on `to_processor()`

PartProcessors do not implement the Processor interface and can sometimes be
confused with Processors, which can lead to the following wrong code:

```python
# p is a  PartProcessor defined somewhere else in the code.
p = part_processor

async def my_processor(
    content: AsyncIterable[ProcessorPart]
) -> AsyncIterable[ProcessorPart]:
  # This is an error as `p` expects a ProcessorPart and not an AsyncIterable.
  async for part in p(content):
    ...
```

This can be easily fixed by casting the PartProcessor into a Processor using the
`to_processor()` method.

```python
# p is a now a processor.
p = part_processor.to_processor()

async def my_processor(
    content: AsyncIterable[ProcessorPart]
) -> AsyncIterable[ProcessorPart]:
  # This is ok, `p` accepts AsyncIterables as input.
  async for part in p(content):
    ...
```

The `to_processor` method can be applied to Processors and PartProcessors alike.
In doubt, do not hesitate to apply it to ensure you can use your Processor with
AsyncIterables.

## 9. üèéÔ∏è Parallel Execution of PartProcessors

You can run multiple PartProcessor instances in parallel using the `//`
operator.

In [None]:
@processor.part_processor_function
async def append_star(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
  """Appends a star to the text."""
  if content_api.is_text(part.mimetype):
    yield content_api.ProcessorPart(part.text + "*")


@processor.part_processor_function
async def append_hash(
    part: content_api.ProcessorPart,
) -> AsyncIterable[content_api.ProcessorPart]:
  """Appends a hash to the text."""
  if content_api.is_text(part.mimetype):
    yield content_api.ProcessorPart(part.text + "#")


parallel_processors = append_star // append_hash // processor.PASSTHROUGH_ALWAYS

input_parts_parallel = streams.stream_content([
    "Item_1",
    "Item_2",
    content_api.ProcessorPart(b"", mimetype="audio/l16;rate=24000"),
])

print("\nParallel Part Processors Output:")
async for part in parallel_processors.to_processor()(input_parts_parallel):
  print(part)

This will output: `Item_1*`, `Item_1#`, `Item_2*`, `Item_2#`, `<audio part>`.

The `//` operator is applied on `PartProcessors` only. All PartProcessors will
then run concurrently on the input parts, and their output sequences will be
concatenated in the order provided to the `//` expression. In this example,
adding a star is done before adding a hash, as `append_star` is before
`append_hash` in the expression. The input order is also respected, with
`Item_1` appearing before `Item_2` in the sequence.

For efficiency, input parts are not copied before passing them to the multiple
PartProcessors in a `//` expression; rather, the same object is passed. This
means the PartProcessors *should not* change any of the mutable attributes of
their input Part argument.

When no output is returned by any of the individual PartProcessors, by default
nothing is returned from the full expression. A special mode can be triggered,
which ensures the input part is returned as-is if no Processor in the `//` group
returns anything:

```python
parallel_processors = (
  append_star // append_hash // processor.PASSTHROUGH_FALLBACK
)
```

Such `//` operator can be handy when chunk processors pre-process the input
based on the input type. A typical usage pattern is as follows where
`xx_processor` defines a pre-processing step on a given part type:

```python
p1 = processor.create_filter(content_api.is_image) + image_processor
p2 = processor.create_filter(content_api.is_audio) + audio_processor
total_processor = p1 // p2 // processor.PASSTHROUGH_FALLBACK
```

Similar to Processors, PartProcessors also have a switch statement. It operates
with PartProcessors and ensures that the parts in the input and output streams
are ordered the same way. They are processed concurrently though meaning that
the order should not matter for their execution.

In [None]:
from genai_processors import switch

input_stream = streams.stream_content([
    content_api.ProcessorPart("a1", substream_name="a"),
    content_api.ProcessorPart("b1", substream_name="b"),
    content_api.ProcessorPart("a2", substream_name="a"),
    content_api.ProcessorPart("b2", substream_name="b"),
    content_api.ProcessorPart("b3", substream_name="b"),
])

m = (
    switch.PartSwitch(content_api.get_substream_name)
    .case("a", append_star)
    .case("b", append_hash)
    .default(processor.passthrough())
)

print("\nPartSwitch Output:")
p = m.to_processor()
async for part in p(input_stream):
  print(part)

Note that the order of the output stream is consistent with the order of the
input stream.

The case conditions in both `Switch` and `PartSwitch` are the same. The main
difference between the two switches is their types (Processor or PartProcessor)
and the fact that the order is kept for one (PartProcessor) but not for the
other. The Switch Processor only preserves order for the chunks going through
the same case condition and the same Processor.

## 10. üß± Handling Different Content Types

The `content_api` module provides utilities for working with various content
types within `ProcessorPart` objects, such as accessing text, images, or custom
structured data.

In [None]:
import io
from PIL import Image

# Create a simple black image
img = Image.new("RGB", (60, 30), color="black")
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_bytes = img_byte_arr.getvalue()

image_part = content_api.ProcessorPart(img_bytes, mimetype="image/png")
text_part = content_api.ProcessorPart("Some text")

# Accessing content
print("\nContent API Examples:")
print(f"Text part text: {text_part.text}")
print(f"Image part mimetype: {image_part.mimetype}")

# Using content_api.as_text to extract text from a list of parts
all_parts = [text_part, image_part, content_api.ProcessorPart(" more text")]
print(f"Combined text from parts: {content_api.as_text(all_parts)}")

## 11. ‚è© Working with Streams and AsyncIterables

Processors operate on `AsyncIterable` streams of `ProcessorPart` objects. The
`streams` module provides helpful functions for managing these streams.

### Converting Iterables to AsyncIterables

The `streams.stream_content` function converts a standard Python iterable (like
a list) into an `AsyncIterable`, which is necessary for processing with
Processors.

In [None]:
import asyncio
from genai_processors import content_api, streams

iterable_data = ["Part 1", "Part 2", "Part 3"]
async_stream = streams.stream_content(iterable_data)

print("\nProcessing stream:")
async for part in async_stream:
  print(f"Received: {part}")

This is mostly used for tests to create an `AsyncIterable` easily from a list.
You can pass a `with_delay_sec` argument to `stream_content` to make sure the
items are not all yielded immediately.

### Gathering a Stream into a List

The `streams.gather_stream` function collects all items from an `AsyncIterable`
into a Python list. This is useful for consuming the entire output of a
processor when the stream is finite.

In [None]:
import asyncio
from genai_processors import content_api, streams

async_stream = streams.stream_content(
    [content_api.ProcessorPart("A"), content_api.ProcessorPart("B")]
)
gathered_list = await streams.gather_stream(async_stream)

print("\nGathered list from stream:")
print(gathered_list)

### Splitting and Merging Streams

The `streams` module provides functions for splitting a single stream into
multiple identical streams (`streams.split`) and merging multiple streams into a
single stream (`streams.merge` and `streams.concat`). These are powerful tools
for building complex processing graphs where different parts of the pipeline
need to operate on the same input or combine results from different sources.

In [None]:
import asyncio
from genai_processors import content_api, processor, streams


@processor.processor_function
async def append_a(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  async for part in content:
    yield content_api.ProcessorPart(part.text + "A")


@processor.processor_function
async def append_b(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  async for part in content:
    yield content_api.ProcessorPart(part.text + "B")


initial_stream = streams.stream_content(
    ["Start", "Finish"],
    # We add a delay after yielding each item. This lets the "Start" items be
    # yielded first.
    with_delay_sec=0.001,
)

# Split the stream into two
stream1, stream2 = streams.split(initial_stream, n=2)

# Process each stream independently
processed_stream1 = append_a(stream1)
processed_stream2 = append_b(stream2)

# Merge the processed streams
merged_stream = streams.merge([processed_stream1, processed_stream2])

print("\nSplit and Merge Example Output:")
async for part in merged_stream:
  print(part.text)

This example splits the initial stream, processes each branch (appending "A" to
one and "B" to the other), and then merges the results. The output order in the
merged stream might vary depending on task scheduling. We have set a delay
`with_delay_sec` which makes sure all `start` items are yielded first. If you
remove it, the scheduling will likely be different.

You can create loops within streams using `merge` and `queues` as follows:

In [None]:
input_stream = streams.stream_content(
    [content_api.ProcessorPart("Hello"), content_api.ProcessorPart("World")],
    # Adds a 0.1 second delay after streaming each part. This is needed in this
    # example to insert the content of the input_queue into the stream before it
    # is closed.
    with_delay_sec=0.1,
)
input_queue = asyncio.Queue()
stream_loop = streams.merge(
    [input_stream, streams.dequeue(input_queue)],
    stop_on_first=True,
)


async def inject_new_part():
  async for part in append_a(stream_loop):
    print(part.text)
    # Wait for 0.09 seconds to inject a new part before the next part is
    # streamed.
    await asyncio.sleep(0.09)
    # Inject a "new_part" Part in the stream_loop.
    await input_queue.put(content_api.ProcessorPart("new_part"))


# This will output: HelloA, new_partA, WorldA
asyncio.run(inject_new_part())

Note that `new_part` has been processed by the `append_a` processor as it was
injected into the `stream_loop` via the `input_queue`. The `stop_on_first`
should be set to `True` here to stop when the `input_stream` is over, otherwise
the loop will fill itself with `new_part` infinitely as the `live_queue` is
never ended.

This idiom lets you create complex pipelines where the output of a processor can
be processed and re-injected in the processor, a typical pattern with real-time
agents.

### Create Streams from Processor Sources

When the input stream comes from an external source like a microphone, a camera
or a network connection, the `@processor.source` decorator provides a
straightforward way to **originate** a stream of `ProcessorPart` objects and
easily create a "source" for your data. Think of it as a special kind of
`Processor` that **starts** a stream of `ProcessorPart` objects, rather than
just transforming an existing one.

You define a source by writing an `async` generator function that `yields` the
`ProcessorPart`s you want to produce. For example, here's a source that reads
input from the terminal:

In [None]:
@processor.source
async def TerminalInput(
    prompt: str,
) -> AsyncIterable[content_api.ProcessorPartTypes]:
  while True:
    input_text = await asyncio.to_thread(input, prompt)
    if input_text == 'q':
      break
    yield input_text


async for part in TerminalInput('>'):
  print(part)

The `@processor.source` decorator automatically turns your generator into a
full-fledged `Processor`.

This means:

1.  **It can be chained:** You can combine multiple sources (or sources with
    regular processors) using the `+` operator. The parts from one source will
    be merged with the parts generated by the next.

    ```python
    # Imagine audio_io.AudioIn and live_model.LiveModel are other processors
    # that produce or process data.
    p = TerminalInput('>') + audio_io.AudioIn(...) + live_model.LiveModel(...)
    # Here, TerminalInput starts the stream, which is then combined with
    # audio input, and finally fed into a live model.
    # endless_stream() is an empty stream that never ends.
    # It is often used when a source initiates the stream.
    async for part in p(streams.endless_stream()):
        # Process the combined output
        pass
    ```

    When chaining a source, its input stream (e.g., `streams.endless_stream()`)
    primarily serves to initiate the source's operation. The source then adds
    its generated parts to this stream, allowing subsequent processors in the
    chain to receive data from both the initial stream and the source.

2.  **It can be used as an input stream:** Since a source itself acts as an
    `AsyncIterable` of `ProcessorPart`s, you can directly pass it as the input
    to any other `Processor`.

    ```python
    # Here, TerminalInput generates parts, and live_model processes them.
    async for part in live_model.LiveModel(...)(TerminalInput('>')):
        # Process parts from the live model
        pass
    ```

## 12. ‚û°Ô∏è Next Steps

This tutorial covered the basics of creating, applying, and chaining processors,
as well as working with different content types and GenAI models.

You can move to the
[create_your_own_processor](https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/create_your_own_processor.ipynb)
notebook to dive deeper into the development of new `Processors`.