core/audio_io.py (86 lines of code) (raw):
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Audio processors."""
import asyncio
from typing import AsyncIterable, Optional
from genai_processors import content_api
from genai_processors import processor
import pyaudio
ProcessorPart = content_api.ProcessorPart
# Audio output chunk size in bytes.
AUDIO_OUT_CHUNK_SIZE = 1024
# Add accepted audio formats here.
AudioFormats = pyaudio.paInt16 | pyaudio.paInt24
@processor.source
async def PyAudioIn(
pya: pyaudio.PyAudio,
substream_name: str = "realtime",
audio_format: AudioFormats = pyaudio.paInt16, # 16-bit PCM.
channels: int = 1,
rate: int = 24000,
use_pcm_mimetype: bool = False,
):
"""Receives audio input and inserts it into the input stream.
The audio input is received from the default input device.
Args:
pya: The pyaudio object to use for capturing audio.
substream_name: The name of the substream that will contain all the audio
parts captured from the mic.
audio_format: The audio format to use for the audio.
channels: The number of channels in the audio.
rate: The sample rate of the audio.
use_pcm_mimetype: Whether to use PCM mimetype instead of the more specific
l16 or l24 mimetype.
"""
mimetype = "audio/"
match audio_format:
case pyaudio.paInt16:
mimetype += "pcm" if use_pcm_mimetype else "l16"
case pyaudio.paInt24:
mimetype += "pcm" if use_pcm_mimetype else "l24"
case _:
raise ValueError(f"Unsupported audio format: {format}")
mimetype = f"{mimetype};rate={rate}"
mic_info = pya.get_default_input_device_info()
audio_stream = await asyncio.to_thread(
pya.open,
format=audio_format,
channels=channels,
rate=rate,
input=True,
input_device_index=mic_info["index"],
frames_per_buffer=AUDIO_OUT_CHUNK_SIZE,
)
if __debug__: # pylint: disable=undefined-variable
kwargs = {"exception_on_overflow": False}
else:
kwargs = {}
while True:
data = await asyncio.to_thread(
audio_stream.read, AUDIO_OUT_CHUNK_SIZE, **kwargs
)
yield ProcessorPart(
data, mimetype=mimetype, substream_name=substream_name, role="USER"
)
class PyAudioOut(processor.Processor):
"""Receives audio output from a live session and talks back to the user.
Uses pyaudio to play audio back to the user.
All non audio parts are passed through based on the `passthrough_audio` param
passed to the constructor.
Combine this processor with `RateLimitAudio` to receive the audio chunks at
the time where they need to be played back to the user.
"""
def __init__(
self,
pya: pyaudio.PyAudio,
audio_format=pyaudio.paInt16, # 16-bit PCM.
channels: int = 1,
rate: int = 24000,
passthrough_audio: bool = False,
):
self._pya = pya
self._format = audio_format
self._channels = channels
self._rate = rate
self._passthrough_audio = passthrough_audio
async def call(
self, content: AsyncIterable[ProcessorPart]
) -> AsyncIterable[ProcessorPart]:
"""Receives audio output from a live session."""
audio_output = asyncio.Queue[Optional[ProcessorPart]]()
stream = await asyncio.to_thread(
self._pya.open,
format=self._format,
channels=self._channels,
rate=self._rate,
output=True,
)
async def play_audio(): # pylint: disable=invalid-name
while part := await audio_output.get():
if part.part.inline_data is not None:
await asyncio.to_thread(stream.write, part.part.inline_data.data)
play_audio_task = processor.create_task(play_audio())
async for part in content:
if content_api.is_audio(part.mimetype):
audio_output.put_nowait(part)
if self._passthrough_audio:
yield part
else:
yield part
await audio_output.put(None)
play_audio_task.cancel()