genesyscloud/genesyscloud-audiohook/dialogflow_api.py (219 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module for interacting with Agent Assist Backend using dialogflow_v2beta1 API version Reference: https://cloud.google.com/python/docs/reference/dialogflow/latest/google.cloud.dialogflow_v2beta1 """ import logging import re import time import google.auth import redis from google.api_core.client_options import ClientOptions from google.api_core.exceptions import FailedPrecondition, OutOfRange, ResourceExhausted from google.cloud import dialogflow_v2beta1 as dialogflow from audio_stream import Stream from audiohook_config import config # Wait for 2 units of 0.5 second for the redis client to set conversation name AWAIT_REDIS_COUNTER = 2 AWAIT_REDIS_SECOND_PER_COUNTER = 0.5 LOCATION_ID_REGEX = r"^projects\/[^/]+\/locations\/([^/]+)" credentials, project = google.auth.default() redis_client = redis.StrictRedis( host=config.redis_host, port=config.redis_port) try: location_id = re.match( LOCATION_ID_REGEX, config.conversation_profile_name)[1] except Exception as e: raise ValueError( "Conversation profile name is not in correct format") from e def determine_dialogflow_api_endpoint(location: str) -> str: """Get Dialogflow api endpoint Reference: https://cloud.google.com/dialogflow/es/docs/reference/rest/v2-overview#service-endpoint """ dialogflow_endpoint = "dialogflow.googleapis.com" if location != 'global': dialogflow_endpoint = f"{location}-dialogflow.googleapis.com" logging.debug("Dialogflow api endpoint %s", dialogflow_endpoint) return dialogflow_endpoint def create_conversation_name(conversation_id: str, location_id: str, project: str) -> str: """Set conversation name for the object """ return f"projects/{project}/locations/{location_id}/conversations/{conversation_id}" def find_participant_by_role(role: dialogflow.Participant.Role, participants_list: list[dialogflow.Participant]) -> dialogflow.Participant | None: for participant in participants_list: if participant.role == role: logging.debug("the active participant is %s:%s ", participant.role, participant.name) return participant return None class DialogflowAPI: """Class for interacting with the Dialogflow API """ def __init__(self) -> None: self.api_endpoint = determine_dialogflow_api_endpoint( location_id) self.participants_client = dialogflow.ParticipantsClient( credentials=credentials, client_options=ClientOptions( api_endpoint=self.api_endpoint) ) self.conversations_client = dialogflow.ConversationsClient( credentials=credentials, client_options=ClientOptions( api_endpoint=self.api_endpoint ) ) self.conversation_profiles_client = dialogflow.ConversationProfilesClient( credentials=credentials, client_options=ClientOptions( api_endpoint=self.api_endpoint)) def get_conversation_profile( self, conversation_profile_name: str) -> dialogflow.ConversationProfile: """Load conversation profile """ logging.debug("Getting conversation profile for %s ", conversation_profile_name) return self.conversation_profiles_client.get_conversation_profile( request=dialogflow.GetConversationProfileRequest( name=conversation_profile_name ) ) def create_conversation( self, conversation_profile: dialogflow.ConversationProfile, conversation_id: str, ) -> dialogflow.Conversation: """Create conversation using conversation_id """ conversation = dialogflow.Conversation( conversation_profile=conversation_profile.name ) project_path = self.conversations_client.common_location_path( project, location_id) conversation_request = dialogflow.CreateConversationRequest( parent=project_path, conversation=conversation, conversation_id=conversation_id, ) conversation = self.conversations_client.create_conversation( request=conversation_request) logging.info( "Created conversation %s for project path %s", conversation.name, project_path) return conversation def get_conversation( self, conversation_name: str) -> dialogflow.Conversation: """Get conversation using the conversation_name from dialogflow """ get_conversation_request = dialogflow.GetConversationRequest( name=conversation_name, ) conversation = self.conversations_client.get_conversation( request=get_conversation_request) return conversation def list_participant(self, conversation_name: str) -> list[dialogflow.Participant]: """List existing participant for Human agent and End user Args: conversation_name (str): conversation name Returns: _type_: _description_ """ participants_pagers = self.participants_client.list_participants( dialogflow.ListParticipantsRequest(parent=conversation_name)) participant_list = list(participants_pagers.__iter__()) logging.debug("participant list %s, type ", participant_list) return participant_list def create_participant( self, conversation_name: str, role: str): """Create both the agent and customer participant for the conversation Args: conversation_id (str): conversation id from the websocket message parameters """ participant = dialogflow.Participant() participant.role = role participant = self.participants_client.create_participant( parent=conversation_name, participant=participant) logging.debug("Creating new participant %s:%s", role, participant) return participant def maintained_streaming_analyze_content( self, audio_stream: Stream, participant: dialogflow.Participant, audio_config: dialogflow.InputAudioConfig): """While the stream is not closed or terminated, maintain a steady call to streaming analyze content API endpoint """ logging.debug("Call streaming analyze content %s, %s", audio_stream.closed, audio_stream.is_final) while not audio_stream.terminate: # while not audio_stream.is_final and not audio_stream.closed: while not audio_stream.closed: self.streaming_analyze_content( audio_stream, participant, audio_config) def streaming_analyze_content( self, audio_stream: Stream, participant: dialogflow.Participant, audio_config: dialogflow.InputAudioConfig): """Call dialogflow backend StreamingAnalyzeContent endpoint, and send the audio binary stream from Audiohook. """ try: logging.debug("call streaming analyze content for %s", participant) responses = self.participants_client.streaming_analyze_content( requests=self.generator_streaming_analyze_content_request( audio_config, participant, audio_stream)) except OutOfRange as e: audio_stream.closed = True logging.warning( "The single audio stream last more than 120 second %s ", e) return except FailedPrecondition as e: audio_stream.closed = True logging.warning( "Failed the precondition check for StreamingAnalyzeContent %s ", e) return except ResourceExhausted as e: audio_stream.closed = True logging.warning( "Exceed quota for calling streaming analyze content %s ", e) return for response in responses: audio_stream.speech_end_offset = response.recognition_result.speech_end_offset.seconds * 1000 logging.debug(response) if response.recognition_result.is_final: audio_stream.is_final = True logging.debug( "Final transcript for %s: %s, and is final offset", participant.role.name, response.recognition_result.transcript, ) offset = response.recognition_result.speech_end_offset audio_stream.is_final_offset = int( offset.seconds * 1000 + offset.microseconds / 1000 ) if response.recognition_result: logging.debug( "Role %s: Interim response recognition result transcript: %s, time %s", participant.role.name, response.recognition_result.transcript, response.recognition_result.speech_end_offset) def complete_conversation(self, conversation_name: str): """Send complete conversation request to Dialogflow """ self.conversations_client.complete_conversation( name=conversation_name ) logging.debug("Call complete conversation for %s", conversation_name) def generator_streaming_analyze_content_request( self, audio_config: dialogflow.InputAudioConfig, participant: dialogflow.Participant, audio_stream: Stream): """Generates and return an iterator for StreamingAnalyzeContentRequest, The first request should only include the input_audio_config And the following request contains the audio chunks as input_audio. The last request does not have any input_audio or input_audio_config and indicates that the server side is half-closing the streaming Args: audio_config (dialogflow.InputAudioConfig): Input for Speech Recognizer https://cloud.google.com/dialogflow/es/docs/reference/rest/v2beta1/InputAudioConfig participant (dialogflow.Participant): Participant for the Dialogflow API call audio_queue (asyncio.Queue): Queue to store the audio binary stream Yields: _type_: first filed the audio config, and then yield the binary data. """ # Sending audio_config for participant enable_debugging_info = config.log_level.upper() == "DEBUG" generator = audio_stream.generator() yield dialogflow.StreamingAnalyzeContentRequest( participant=participant.name, audio_config=audio_config, enable_debugging_info=enable_debugging_info, ) for content in generator: # next audio chunks to streaming_analyze_content yield dialogflow.StreamingAnalyzeContentRequest( input_audio=content, enable_debugging_info=enable_debugging_info, ) logging.info( "Participant: %s, streaming analyze content request, end streaming yield an empty request ", participant.name) yield dialogflow.StreamingAnalyzeContentRequest( enable_debugging_info=enable_debugging_info, ) logging.debug( "Ending the current audio stream session, start new session") def await_redis(conversation_name: str) -> bool: """Check if the redis memory store connection has been established or not """ conversation_name = determine_conversation_name_without_location( conversation_name) # give the user bit time to accept the call and wait for the Agent Assist Backend # to create the redis memory store counter = AWAIT_REDIS_COUNTER redis_exists = redis_client.exists(conversation_name) != 0 while not redis_exists and counter > 0: time.sleep(AWAIT_REDIS_SECOND_PER_COUNTER) redis_exists = redis_client.exists(conversation_name) != 0 counter = counter - 1 logging.debug("return to send resume message redis client exist %s and final counter %s ", redis_exists, counter) return redis_exists def determine_conversation_name_without_location(conversation_name: str): """Returns a conversation name without its location id.""" conversation_name_without_location = conversation_name if '/locations/' in conversation_name: name_array = conversation_name.split('/') conversation_name_without_location = '/'.join( name_array[i] for i in [0, 1, -2, -1]) return conversation_name_without_location