in genesyscloud/genesyscloud-audiohook/audio_stream.py [0:0]
def generator(self):
"""Stream Audio from Genesys Audiohook Monitor to API and to local buffer"""
# Handle restart.
logging.debug("Restart generator")
self.restart_counter += 1
# After the restart of the streaming, set is_final to False
# to resume populating audio data
self.is_final = False
total_processed_time = self.last_start_time + self.is_final_offset
# ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) * BitRate(Bits per Sample) / 8
# MULAW audio format is 8bit depth, 8000HZ then convert bits to bytes by
# dividing 8
# reference https://en.wikipedia.org/wiki/G.711
processed_bytes_length = (
int(total_processed_time * self._rate * 8 / 8) / 1000
)
logging.debug(
"last start time is %s, is final offset: %s, total processed time %s",
self.last_start_time,
self.is_final_offset,
total_processed_time)
self.last_start_time = total_processed_time
# Send out bytes stored in self.audio_input_chunks that is after the
# processed_bytes_length.
if processed_bytes_length != 0:
audio_bytes = b"".join(self.audio_input_chunks)
# Lookback for unprocessed audio data.
# ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) * BitRate(Bits per Sample) / 8
# reference https://en.wikipedia.org/wiki/G.711
need_to_process_length = min(
int(len(audio_bytes) - processed_bytes_length),
int(config.max_lookback * self._rate * 8 / 8),
)
# Note that you need to explicitly use `int` type for
# substring.
need_to_process_bytes = audio_bytes[(-1)
* need_to_process_length:]
logging.debug(
"Sending need to process bytes length %s, total audio byte length %s, processed byte length %s ",
len(need_to_process_bytes),
len(audio_bytes),
processed_bytes_length)
try:
yield need_to_process_bytes
except GeneratorExit as e:
logging.debug(
"Generator exit from the need to process step %s", e)
return
try:
while not self.closed and not self.is_final:
if self.speech_end_offset > 110000:
# because Genesys is streaming non-stop the audio,
# put a hard stop when approaching 120 second limit and produce a
# force half close
self.is_final = True
break
data = []
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
try:
chunk = self._buff.get(block=True, timeout=0.5)
except queue.Empty:
logging.debug(
"queue is empty break the loop and stop generator")
break
if chunk is None:
logging.debug(
"chunk is none half close the stream by stopping generates requests")
return
data.append(chunk)
# Now try to the rest of chunks if there are any left in the
# _buff.
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
logging.debug(
"Remaining chunk is none half close the stream")
return
data.append(chunk)
except queue.Empty:
# queue is empty quitting the loop
break
self.audio_input_chunks.extend(data)
if data:
yield b"".join(data)
except GeneratorExit as e:
logging.debug("Generator exit after is_final set to true %s", e)
return
logging.debug("Stop generator")