in genesyscloud/genesyscloud-audiohook/audiohook_blueprint.py [0:0]
def audiohook_connect(ws: Server):
"""Genesys Cloud Audiohook connector
Args:
ws (Server): Websocket server for exchange messages
"""
agent_stream = Stream(
config.rate, chunk_size=config.chunk_size)
customer_stream = Stream(
config.rate, chunk_size=config.chunk_size)
dialogflow_api = DialogflowAPI()
audiohook = AudioHook()
logging.info(
"Audiohook client connected with the interceptor server")
open_conversation_state = None
while True:
data = ws.receive()
if isinstance(data, str):
try:
json_message = json.loads(data)
except ValueError as e:
logging.warning(
"Not a valid JSON message %s, error details %s ", data, e)
continue
message_type = json_message.get("type")
logging.info(
"Handle %s message %s", message_type, json_message)
conversation_id = json_message.get("parameters", {}).get(
"conversationId", DEFAULT_CONVERSATION_ID)
audiohook.set_session_id(json_message.get("id", 0))
audiohook.set_client_sequence(json_message.get("seq"))
if message_type == "open":
if conversation_id == DEFAULT_CONVERSATION_ID:
logging.debug(
"Connection Probe, not creating Dialogflow Conversation")
ws.send(json.dumps(audiohook.create_opened_message()))
elif conversation_id != DEFAULT_CONVERSATION_ID and open_conversation_state is None:
# Get the first "open" message for real conversation
# open_state contains the agent and user thread for
# calling streaming_analyze_content
# a bool flag indicating if conversation, participants have been initialized
# and the conversation_name for the dialogflow.Conversation object
open_conversation_state = process_open_conversation_message(
conversation_id,
dialogflow_api,
agent_stream,
customer_stream,
ws,
audiohook,
)
logging.debug(
"open conversation message %s ", open_conversation_state)
# Check if the redis client has join_room called from the
# agent assist backend. Before setting conversation_name in the redis client,
# we should not publish any messages to the redis client
# otherwise e UI modules will not receive pub/subs until redis connects the conversation
await_redis_thread = Thread(target=wait_for_redis_resume, args=(
open_conversation_state, audiohook, ws))
await_redis_thread.start()
elif message_type == "ping":
ws.send(json.dumps(audiohook.create_pong_message()))
elif message_type == "close" and open_conversation_state is None:
# This "close" is for a connection prob, we don't need to call dialogflow
# to complete conversation and terminate the stream in this case
ws.send(json.dumps(audiohook.create_close_message()))
break
elif open_conversation_state is not None:
# Close websocket connection when receive a "close message"
if (process_ongoing_conversation_messages(json_message,
dialogflow_api,
audiohook,
agent_stream,
customer_stream,
open_conversation_state, ws,
)):
logging.info(
"Disconnecting Audiohook with the server")
break
else:
# audio is a 2-channel interleaved 8-bit PCMU audio stream
# which is separated into single streams
# using numpy
# stream the audio to pub/sub
array = np.frombuffer(data, dtype=np.int8)
reshaped = array.reshape(
(int(len(array) / 2), 2))
# append audio to customer audio buffer
customer_stream.fill_buffer(reshaped[:, 0].tobytes())
# append audio to agent audio buffer
agent_stream.fill_buffer(reshaped[:, 1].tobytes())