genesyscloud/genesyscloud-audiohook/audiohook_blueprint.py (193 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Module for receiving audio streaming from Audiohook Monitor and call Agent Assist backend """ import json import logging from dataclasses import dataclass, field from threading import Thread import numpy as np from flask import Blueprint from flask_sock import Sock from google.api_core.exceptions import NotFound from google.cloud import dialogflow_v2beta1 as dialogflow from simple_websocket import Server from audio_stream import Stream from audiohook import DEFAULT_CONVERSATION_ID, AudioHook from audiohook_config import config from dialogflow_api import (DialogflowAPI, await_redis, create_conversation_name, find_participant_by_role, location_id, project) audiohook_bp = Blueprint("audiohook", __name__) sock = Sock(audiohook_bp) logging.getLogger() logging.basicConfig( format='%(levelname)-8s [%(filename)s:%(lineno)d in ' 'function %(funcName)s] %(message)s', datefmt='%Y-%m-%d:%H:%M:%S', level=config.log_level.upper() ) @dataclass class OpenConversationState: """ Memorize the state after open message that is not a connection prob """ agent_thread: Thread user_thread: Thread conversation_name: str is_opened: bool def process_open_conversation_message( conversation_id: str, dialogflow_api: DialogflowAPI, agent_stream: Stream, customer_stream: Stream, ws: Server, audiohook: AudioHook ) -> OpenConversationState: """Process "open" message get from Audiohook Monitor, and establish a state object for conversation_name, agent_thread, user_thread, and is_opened bool """ conversation_profile = dialogflow_api.get_conversation_profile( conversation_profile_name=config.conversation_profile_name) agent_audio_config = agent_stream.define_audio_config(conversation_profile) customer_audio_config = customer_stream.define_audio_config( conversation_profile) normalized_conversation_id = 'a' + conversation_id conversation_name = create_conversation_name( normalized_conversation_id, location_id, project) try: dialogflow_api.get_conversation( conversation_name) except NotFound as e: logging.warning("Error getting the conversation : %s", e) dialogflow_api.create_conversation( conversation_profile, normalized_conversation_id) try: participants_list = dialogflow_api.list_participant( conversation_name) participant_agent = find_participant_by_role( dialogflow.Participant.Role.HUMAN_AGENT, participants_list) participant_user = find_participant_by_role( dialogflow.Participant.Role.END_USER, participants_list) if not participant_agent: participant_agent = dialogflow_api.create_participant( conversation_name=conversation_name, role="HUMAN_AGENT") if not participant_user: participant_user = dialogflow_api.create_participant( conversation_name=conversation_name, role="END_USER") except NotFound as e: logging.error("Participants not found %s: ", e) agent_thread = Thread( target=dialogflow_api.maintained_streaming_analyze_content, args=( agent_stream, participant_agent, agent_audio_config)) user_thread = Thread( target=dialogflow_api.maintained_streaming_analyze_content, args=( customer_stream, participant_user, customer_audio_config)) ws.send(json.dumps(audiohook.create_opened_message())) return OpenConversationState( agent_thread, user_thread, conversation_name, True) def process_ongoing_conversation_messages( message: json, dialogflow_api: DialogflowAPI, audiohook: AudioHook, agent_stream: Stream, customer_stream: Stream, open_conversation_state: OpenConversationState, ws: Server) -> bool: """Process string messages that are not "open" and "ping" from Audiohook client through websocket. Note: Audiohook client passes Null UUIDs (00000000-0000-0000-0000-000000000000) as conversationId and participant.id parameters to identify connection probes. https://developer.genesys.cloud/devapps/audiohook/protocol-reference#openparameters Reference: https://developer.genesys.cloud/devapps/audiohook/protocol-reference Return: True, if there is a connection close message. So the outer loop for receiving audio can be completed False, for other messages to continue process messages from websocket """ message_type = message.get("type") match message_type: case "resumed": # The first paused message after open message sets the # closed to True, now after resume, need to flip the bit customer_stream.closed = False agent_stream.closed = False open_conversation_state.agent_thread.start() open_conversation_state.user_thread.start() case "paused": customer_stream.closed = True agent_stream.closed = True logging.debug("Audio stream is paused") case "close": # This "close" is for ending a real conversation agent_stream.closed = True customer_stream.closed = True agent_stream.terminate = True customer_stream.terminate = True ws.send(json.dumps(audiohook.create_close_message())) try: dialogflow_api.complete_conversation( open_conversation_state.conversation_name) except Exception as e: logging.error("Error completing conversation %s", e) # wait for the two thread to finish then terminate logging.debug("Stop streaming threads for customers and agents") return True case "discarded": start_time = message.get("START_TIME") duration = message.get("DURATION") logging.info( "Currently the audio stream has been paused from %s for about %s second", start_time, duration) return False def wait_for_redis_resume(open_conversation_state: OpenConversationState, audiohook: AudioHook, ws: Server): await_redis( open_conversation_state.conversation_name) # Always send the resume after awaiting the redis, don't stop the audio streaming # event if redis client is not set ws.send(json.dumps(audiohook.create_resume_message())) @sock.route('/connect') def audiohook_connect(ws: Server): """Genesys Cloud Audiohook connector Args: ws (Server): Websocket server for exchange messages """ agent_stream = Stream( config.rate, chunk_size=config.chunk_size) customer_stream = Stream( config.rate, chunk_size=config.chunk_size) dialogflow_api = DialogflowAPI() audiohook = AudioHook() logging.info( "Audiohook client connected with the interceptor server") open_conversation_state = None while True: data = ws.receive() if isinstance(data, str): try: json_message = json.loads(data) except ValueError as e: logging.warning( "Not a valid JSON message %s, error details %s ", data, e) continue message_type = json_message.get("type") logging.info( "Handle %s message %s", message_type, json_message) conversation_id = json_message.get("parameters", {}).get( "conversationId", DEFAULT_CONVERSATION_ID) audiohook.set_session_id(json_message.get("id", 0)) audiohook.set_client_sequence(json_message.get("seq")) if message_type == "open": if conversation_id == DEFAULT_CONVERSATION_ID: logging.debug( "Connection Probe, not creating Dialogflow Conversation") ws.send(json.dumps(audiohook.create_opened_message())) elif conversation_id != DEFAULT_CONVERSATION_ID and open_conversation_state is None: # Get the first "open" message for real conversation # open_state contains the agent and user thread for # calling streaming_analyze_content # a bool flag indicating if conversation, participants have been initialized # and the conversation_name for the dialogflow.Conversation object open_conversation_state = process_open_conversation_message( conversation_id, dialogflow_api, agent_stream, customer_stream, ws, audiohook, ) logging.debug( "open conversation message %s ", open_conversation_state) # Check if the redis client has join_room called from the # agent assist backend. Before setting conversation_name in the redis client, # we should not publish any messages to the redis client # otherwise e UI modules will not receive pub/subs until redis connects the conversation await_redis_thread = Thread(target=wait_for_redis_resume, args=( open_conversation_state, audiohook, ws)) await_redis_thread.start() elif message_type == "ping": ws.send(json.dumps(audiohook.create_pong_message())) elif message_type == "close" and open_conversation_state is None: # This "close" is for a connection prob, we don't need to call dialogflow # to complete conversation and terminate the stream in this case ws.send(json.dumps(audiohook.create_close_message())) break elif open_conversation_state is not None: # Close websocket connection when receive a "close message" if (process_ongoing_conversation_messages(json_message, dialogflow_api, audiohook, agent_stream, customer_stream, open_conversation_state, ws, )): logging.info( "Disconnecting Audiohook with the server") break else: # audio is a 2-channel interleaved 8-bit PCMU audio stream # which is separated into single streams # using numpy # stream the audio to pub/sub array = np.frombuffer(data, dtype=np.int8) reshaped = array.reshape( (int(len(array) / 2), 2)) # append audio to customer audio buffer customer_stream.fill_buffer(reshaped[:, 0].tobytes()) # append audio to agent audio buffer agent_stream.fill_buffer(reshaped[:, 1].tobytes())