core/text_to_speech.py (89 lines of code) (raw):

# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Streams audio content parts from text parts. Uses Google Text-To-Speech API to generate audio parts from text parts. Install google cloud Text-To-Speech speech client with: ```python pip install --upgrade google-cloud-texttospeech ``` See the `text_to_speech_cli.py` script for a usage example and how to test it locally. """ import asyncio from collections.abc import AsyncIterable from genai_processors import content_api from genai_processors import processor from genai_processors import streams from google.cloud import texttospeech_v1 as texttospeech ProcessorPart = content_api.ProcessorPart class TextToSpeech(processor.Processor): """Streams audio content parts from text parts.""" def __init__( self, project_id: str, language_code: str = 'en-US', voice_name: str = 'en-US-Chirp3-HD-Charon', with_text_passthrough: bool = True, ): """Initializes the TextToSpeech processor. This processor uses the Google Text-To-Speech API to generate audio parts from text parts. The audio parts are yielded as `ProcessorPart` objects with the `audio_content` attribute set to the audio bytes and the `mimetype` attribute set to `audio/l16;rate=24000`. Args: project_id: The project ID to use for the Text-To-Speech API. language_code: The language code to use for the Text-To-Speech API. voice_name: The voice name to use for the Text-To-Speech API. See list of voices here: https://cloud.google.com/text-to-speech/docs/chirp3-hd with_text_passthrough: Whether to passthrough the text parts to the output stream. When set to True, the text parts are yielded back. """ streaming_config = texttospeech.StreamingSynthesizeConfig( streaming_audio_config=texttospeech.StreamingAudioConfig( audio_encoding=texttospeech.AudioEncoding.PCM, sample_rate_hertz=24000, ), voice=texttospeech.VoiceSelectionParams( name=voice_name, language_code=language_code, ), ) self._config_request = texttospeech.StreamingSynthesizeRequest( streaming_config=streaming_config ) self._project_id = project_id self._with_text_passthrough = with_text_passthrough async def call( self, content: AsyncIterable[ProcessorPart] ) -> AsyncIterable[ProcessorPart]: """Streams audio content parts from text parts. The order between TTS-processed parts and pass-through parts is not maintained. This processor treats all its inputs as realtime and sends them to the output as soon as possible. Order within TTS-processed an pass-through parts is maintained, but we don't wait for the TTS result to emit the next pass-through part. Args: content: The input stream of content to process. Non-text parts are passed through unchanged. Yields: The audio parts generated by the Text-To-Speech API from the text parts in `content`. If `with_text_passthrough` is True, the text parts are yielded back as well. All non-text parts are yielded unchanged. """ # The output queue is used to yield the audio parts unchanged in the output # stream when _with_text_passthrough is True. output_queue = asyncio.Queue[ProcessorPart | None]() first_chunk_received = asyncio.Event() async def request_stream( request_queue: asyncio.Queue[ texttospeech.StreamingSynthesizeRequest | None ], ): try: request_queue.put_nowait(self._config_request) async for part in content: if ( not content_api.is_text(part.mimetype) or self._with_text_passthrough ): output_queue.put_nowait(part) if not content_api.is_text(part.mimetype) or not part.text: continue first_chunk_received.set() request_queue.put_nowait( texttospeech.StreamingSynthesizeRequest( input=texttospeech.StreamingSynthesisInput( text=part.text, ) ) ) finally: first_chunk_received.set() request_queue.put_nowait(None) async def send_text_to_speech_requests(): try: request_queue = asyncio.Queue[ texttospeech.StreamingSynthesizeRequest | None ]() enqueue_request_task = processor.create_task( request_stream(request_queue) ) # Wait until the first request is sent to the Speech API to avoid the # client being created before the first request is sent. # The client can indeed only stay up for 5 seconds without any request # and then it needs to be re-created. await first_chunk_received.wait() client = texttospeech.TextToSpeechAsyncClient() streaming_responses = await client.streaming_synthesize( requests=streams.dequeue(request_queue) ) async for response in streaming_responses: output_queue.put_nowait( ProcessorPart( response.audio_content, mimetype='audio/l16;rate=24000', role='model', ) ) await enqueue_request_task finally: output_queue.put_nowait(None) send_task = processor.create_task(send_text_to_speech_requests()) while part := await output_queue.get(): yield part await send_task