src/agents/voice/workflow.py (43 lines of code) (raw):
from __future__ import annotations
import abc
from collections.abc import AsyncIterator
from typing import Any
from ..agent import Agent
from ..items import TResponseInputItem
from ..result import RunResultStreaming
from ..run import Runner
class VoiceWorkflowBase(abc.ABC):
"""
A base class for a voice workflow. You must implement the `run` method. A "workflow" is any
code you want, that receives a transcription and yields text that will be turned into speech
by a text-to-speech model.
In most cases, you'll create `Agent`s and use `Runner.run_streamed()` to run them, returning
some or all of the text events from the stream. You can use the `VoiceWorkflowHelper` class to
help with extracting text events from the stream.
If you have a simple workflow that has a single starting agent and no custom logic, you can
use `SingleAgentVoiceWorkflow` directly.
"""
@abc.abstractmethod
def run(self, transcription: str) -> AsyncIterator[str]:
"""
Run the voice workflow. You will receive an input transcription, and must yield text that
will be spoken to the user. You can run whatever logic you want here. In most cases, the
final logic will involve calling `Runner.run_streamed()` and yielding any text events from
the stream.
"""
pass
class VoiceWorkflowHelper:
@classmethod
async def stream_text_from(cls, result: RunResultStreaming) -> AsyncIterator[str]:
"""Wraps a `RunResultStreaming` object and yields text events from the stream."""
async for event in result.stream_events():
if (
event.type == "raw_response_event"
and event.data.type == "response.output_text.delta"
):
yield event.data.delta
class SingleAgentWorkflowCallbacks:
def on_run(self, workflow: SingleAgentVoiceWorkflow, transcription: str) -> None:
"""Called when the workflow is run."""
pass
class SingleAgentVoiceWorkflow(VoiceWorkflowBase):
"""A simple voice workflow that runs a single agent. Each transcription and result is added to
the input history.
For more complex workflows (e.g. multiple Runner calls, custom message history, custom logic,
custom configs), subclass `VoiceWorkflowBase` and implement your own logic.
"""
def __init__(self, agent: Agent[Any], callbacks: SingleAgentWorkflowCallbacks | None = None):
"""Create a new single agent voice workflow.
Args:
agent: The agent to run.
callbacks: Optional callbacks to call during the workflow.
"""
self._input_history: list[TResponseInputItem] = []
self._current_agent = agent
self._callbacks = callbacks
async def run(self, transcription: str) -> AsyncIterator[str]:
if self._callbacks:
self._callbacks.on_run(self, transcription)
# Add the transcription to the input history
self._input_history.append(
{
"role": "user",
"content": transcription,
}
)
# Run the agent
result = Runner.run_streamed(self._current_agent, self._input_history)
# Stream the text from the result
async for chunk in VoiceWorkflowHelper.stream_text_from(result):
yield chunk
# Update the input history and current agent
self._input_history = result.to_input_list()
self._current_agent = result.last_agent