def generator()

in genesyscloud/genesyscloud-audiohook/audio_stream.py [0:0]


    def generator(self):
        """Stream Audio from Genesys Audiohook Monitor to API and to local buffer"""
        # Handle restart.
        logging.debug("Restart generator")
        self.restart_counter += 1
        # After the restart of the streaming, set is_final to False
        # to resume populating audio data
        self.is_final = False
        total_processed_time = self.last_start_time + self.is_final_offset
        # ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) *  BitRate(Bits per Sample) / 8
        # MULAW audio format is 8bit depth, 8000HZ then convert bits to bytes by
        # dividing 8
        # reference https://en.wikipedia.org/wiki/G.711
        processed_bytes_length = (
            int(total_processed_time * self._rate * 8 / 8) / 1000
        )
        logging.debug(
            "last start time is %s, is final offset: %s, total processed time %s",
            self.last_start_time,
            self.is_final_offset,
            total_processed_time)
        self.last_start_time = total_processed_time
        # Send out bytes stored in self.audio_input_chunks that is after the
        # processed_bytes_length.
        if processed_bytes_length != 0:
            audio_bytes = b"".join(self.audio_input_chunks)
            # Lookback for unprocessed audio data.
            # ApproximatesBytes = Rate(Sample per Second) * Duration(Seconds) *  BitRate(Bits per Sample) / 8
            # reference https://en.wikipedia.org/wiki/G.711
            need_to_process_length = min(
                int(len(audio_bytes) - processed_bytes_length),
                int(config.max_lookback * self._rate * 8 / 8),
            )
            # Note that you need to explicitly use `int` type for
            # substring.
            need_to_process_bytes = audio_bytes[(-1)
                                                * need_to_process_length:]
            logging.debug(
                "Sending need to process bytes length %s, total audio byte length %s, processed byte length %s ",
                len(need_to_process_bytes),
                len(audio_bytes),
                processed_bytes_length)
            try:
                yield need_to_process_bytes
            except GeneratorExit as e:
                logging.debug(
                    "Generator exit from the need to process step %s", e)
                return
        try:
            while not self.closed and not self.is_final:
                if self.speech_end_offset > 110000:
                    # because Genesys is streaming non-stop the audio,
                    # put a hard stop when approaching 120 second limit and produce a
                    # force half close
                    self.is_final = True
                    break
                data = []
                # Use a blocking get() to ensure there's at least one chunk of
                # data, and stop iteration if the chunk is None, indicating the
                # end of the audio stream.
                try:
                    chunk = self._buff.get(block=True, timeout=0.5)
                except queue.Empty:
                    logging.debug(
                        "queue is empty break the loop and stop generator")
                    break
                if chunk is None:
                    logging.debug(
                        "chunk is none half close the stream by stopping generates requests")
                    return
                data.append(chunk)
                # Now try to the rest of chunks if there are any left in the
                # _buff.
                while True:
                    try:
                        chunk = self._buff.get(block=False)
                        if chunk is None:
                            logging.debug(
                                "Remaining chunk is none half close the stream")
                            return
                        data.append(chunk)
                    except queue.Empty:
                        # queue is empty quitting the loop
                        break
                self.audio_input_chunks.extend(data)

                if data:
                    yield b"".join(data)
        except GeneratorExit as e:
            logging.debug("Generator exit after is_final set to true %s", e)
            return
        logging.debug("Stop generator")