genesyscloud/genesyscloud-audiohook/audio_stream.py (105 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Referenced implementation from https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dialogflow/streaming_transcription.py """ import logging import queue import google.cloud.dialogflow_v2beta1 as dialogflow from audiohook_config import config class Stream: """Opens a stream as a generator yielding the audio chunks. The generator method returns an iterator that contains subsequent audio bytes received from audio source to the StreamingAnalyzeContentRequest Reference: https://cloud.google.com/python/docs/reference/dialogflow/latest/google.cloud.dialogflow_v2.services.participants.ParticipantsAsyncClient""" def __init__(self, rate, chunk_size): self._rate = rate self.chunk_size = chunk_size self._num_channels = 1 self._buff = queue.Queue() self.is_final = False self.closed = False self.terminate = False # Count the number of times the stream analyze content restarts. self.restart_counter = 0 self.last_start_time = 0 self.terminate = False self.total_input_audio_time = 0 # Time end of the last is_final in millisecond since last_start_time. self.is_final_offset = 0 # Time end of the interim result speech end offset in second self.speech_end_offset = 0 # Save the audio chunks generated from the start of the audio stream for # replay after restart. self.audio_input_chunks = [] self.new_stream = True # Only MULAW audio encodings are currently supported in Audiohook # Monitor self.audio_encoding = dialogflow.AudioEncoding.AUDIO_ENCODING_MULAW def fill_buffer(self, in_data, *args, **kwargs): self._buff.put(in_data) def define_audio_config( self, conversation_profile: dialogflow.ConversationProfile): """The Audiohook client will currently only offer PCMU. Reference: https://cloud.google.com/agent-assist/docs/extended-streaming https://developer.genesys.cloud/devapps/audiohook/session-walkthrough#audio-streaming """ language_code = conversation_profile.language_code or "en-US" stt_model = conversation_profile.stt_config.model or "phone_call" audio_input_config = dialogflow.InputAudioConfig( audio_encoding=self.audio_encoding, sample_rate_hertz=self._rate, language_code=language_code, model=stt_model, model_variant="USE_ENHANCED", enable_automatic_punctuation=True) logging.debug("Input audio config %s ", audio_input_config) return audio_input_config def generator(self): """Stream Audio from Genesys Audiohook Monitor to API and to local buffer""" # Handle restart. logging.debug("Restart generator") self.restart_counter += 1 # After the restart of the streaming, set is_final to False # to resume populating audio data self.is_final = False total_processed_time = self.last_start_time + self.is_final_offset # ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) * BitRate(Bits per Sample) / 8 # MULAW audio format is 8bit depth, 8000HZ then convert bits to bytes by # dividing 8 # reference https://en.wikipedia.org/wiki/G.711 processed_bytes_length = ( int(total_processed_time * self._rate * 8 / 8) / 1000 ) logging.debug( "last start time is %s, is final offset: %s, total processed time %s", self.last_start_time, self.is_final_offset, total_processed_time) self.last_start_time = total_processed_time # Send out bytes stored in self.audio_input_chunks that is after the # processed_bytes_length. if processed_bytes_length != 0: audio_bytes = b"".join(self.audio_input_chunks) # Lookback for unprocessed audio data. # ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) * BitRate(Bits per Sample) / 8 # reference https://en.wikipedia.org/wiki/G.711 need_to_process_length = min( int(len(audio_bytes) - processed_bytes_length), int(config.max_lookback * self._rate * 8 / 8), ) # Note that you need to explicitly use `int` type for # substring. need_to_process_bytes = audio_bytes[(-1) * need_to_process_length:] logging.debug( "Sending need to process bytes length %s, total audio byte length %s, processed byte length %s ", len(need_to_process_bytes), len(audio_bytes), processed_bytes_length) try: yield need_to_process_bytes except GeneratorExit as e: logging.debug( "Generator exit from the need to process step %s", e) return try: while not self.closed and not self.is_final: if self.speech_end_offset > 110000: # because Genesys is streaming non-stop the audio, # put a hard stop when approaching 120 second limit and produce a # force half close self.is_final = True break data = [] # Use a blocking get() to ensure there's at least one chunk of # data, and stop iteration if the chunk is None, indicating the # end of the audio stream. try: chunk = self._buff.get(block=True, timeout=0.5) except queue.Empty: logging.debug( "queue is empty break the loop and stop generator") break if chunk is None: logging.debug( "chunk is none half close the stream by stopping generates requests") return data.append(chunk) # Now try to the rest of chunks if there are any left in the # _buff. while True: try: chunk = self._buff.get(block=False) if chunk is None: logging.debug( "Remaining chunk is none half close the stream") return data.append(chunk) except queue.Empty: # queue is empty quitting the loop break self.audio_input_chunks.extend(data) if data: yield b"".join(data) except GeneratorExit as e: logging.debug("Generator exit after is_final set to true %s", e) return logging.debug("Stop generator")