def audiohook_connect()

in genesyscloud/genesyscloud-audiohook/audiohook_blueprint.py [0:0]


def audiohook_connect(ws: Server):
    """Genesys Cloud Audiohook connector

    Args:
        ws (Server): Websocket server for exchange messages
    """
    agent_stream = Stream(
        config.rate, chunk_size=config.chunk_size)
    customer_stream = Stream(
        config.rate, chunk_size=config.chunk_size)

    dialogflow_api = DialogflowAPI()
    audiohook = AudioHook()
    logging.info(
        "Audiohook client connected with the interceptor server")
    open_conversation_state = None
    while True:
        data = ws.receive()
        if isinstance(data, str):
            try:
                json_message = json.loads(data)
            except ValueError as e:
                logging.warning(
                    "Not a valid JSON message %s, error details %s ", data, e)
                continue
            message_type = json_message.get("type")
            logging.info(
                "Handle %s message %s", message_type, json_message)
            conversation_id = json_message.get("parameters", {}).get(
                "conversationId", DEFAULT_CONVERSATION_ID)
            audiohook.set_session_id(json_message.get("id", 0))
            audiohook.set_client_sequence(json_message.get("seq"))
            if message_type == "open":
                if conversation_id == DEFAULT_CONVERSATION_ID:
                    logging.debug(
                        "Connection Probe, not creating Dialogflow Conversation")
                    ws.send(json.dumps(audiohook.create_opened_message()))
                elif conversation_id != DEFAULT_CONVERSATION_ID and open_conversation_state is None:
                    # Get the first "open" message for real conversation
                    # open_state contains the agent and user thread for
                    # calling streaming_analyze_content
                    # a bool flag indicating if conversation, participants have been initialized
                    # and the conversation_name for the dialogflow.Conversation object
                    open_conversation_state = process_open_conversation_message(
                        conversation_id,
                        dialogflow_api,
                        agent_stream,
                        customer_stream,
                        ws,
                        audiohook,
                    )
                    logging.debug(
                        "open conversation message %s ", open_conversation_state)
                    # Check if the redis client has join_room called from the
                    # agent assist backend. Before setting conversation_name in the redis client,
                    # we should not publish any messages to the redis client
                    # otherwise e UI modules will not receive pub/subs until redis connects the conversation
                    await_redis_thread = Thread(target=wait_for_redis_resume, args=(
                        open_conversation_state, audiohook, ws))
                    await_redis_thread.start()
            elif message_type == "ping":
                ws.send(json.dumps(audiohook.create_pong_message()))
            elif message_type == "close" and open_conversation_state is None:
                # This "close" is for a connection prob, we don't need to call dialogflow
                # to complete conversation and terminate the stream in this case
                ws.send(json.dumps(audiohook.create_close_message()))
                break
            elif open_conversation_state is not None:
                # Close websocket connection when receive a "close message"
                if (process_ongoing_conversation_messages(json_message,
                                                          dialogflow_api,
                                                          audiohook,
                                                          agent_stream,
                                                          customer_stream,
                                                          open_conversation_state, ws,
                                                          )):
                    logging.info(
                        "Disconnecting Audiohook with the server")
                    break
        else:
            # audio is a 2-channel interleaved 8-bit PCMU audio stream
            # which is separated into single streams
            # using numpy
            # stream the audio to pub/sub
            array = np.frombuffer(data, dtype=np.int8)
            reshaped = array.reshape(
                (int(len(array) / 2), 2))
            # append audio to customer audio buffer
            customer_stream.fill_buffer(reshaped[:, 0].tobytes())
            # append audio to agent audio buffer
            agent_stream.fill_buffer(reshaped[:, 1].tobytes())