aa-integration-backend/cloud-pubsub-interceptor/main.py (108 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import base64 import os import redis import json from datetime import datetime from flask import Flask, request # Cloud run could recognize logging files under '/var/log/' folder # Comment this line for local test logging.basicConfig(filename='/var/log/test.log', level=logging.INFO) app = Flask(__name__) # Redis setup redis_host = os.environ.get('REDISHOST', 'localhost') redis_port = int(os.environ.get('REDISPORT', 6379)) redis_client = redis.StrictRedis( host=redis_host, port=redis_port, health_check_interval=10, socket_connect_timeout=15, retry_on_timeout=True, socket_keepalive=True, retry=redis.retry.Retry(redis.backoff.ExponentialBackoff(cap=5, base=1), 5), retry_on_error=[redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, redis.exceptions.ResponseError]) def get_conversation_name_without_location(conversation_name): """Returns a conversation name without its location id.""" conversation_name_without_location = conversation_name if '/locations/' in conversation_name: name_array = conversation_name.split('/') conversation_name_without_location = '/'.join( name_array[i] for i in [0, 1, -2, -1]) return conversation_name_without_location def cloud_pubsub_handler(request, data_type): """Verifies and checks requests from Cloud Pub/Sub.""" envelope = request.get_json() if not envelope: msg = 'No Pub/Sub message received.' logging.warning('Warning: {}'.format(msg)) return True if not isinstance(envelope, dict) or 'message' not in envelope: msg = 'Invalid Pub/Sub message format.' logging.warning('Warning: {}'.format(msg)) return True pubsub_message = envelope['message'] participant_role = "" new_recognition_result_message_id = '' if not isinstance(pubsub_message, dict): msg = 'Invalid Pub/Sub message, message inside the envelope should be valid JSON format https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage.' logging.warning('Warning: {}'.format(msg)) return True if 'attributes' in pubsub_message: attributes = pubsub_message['attributes'] if attributes is not None and isinstance(attributes, dict): if "participant_role" in attributes: participant_role = attributes['participant_role'] if "message_id" in attributes: new_recognition_result_message_id = attributes['message_id'] if 'data' in pubsub_message: data = base64.b64decode(pubsub_message['data']).decode('utf-8') logging.debug('Subscribed Pub/Sub message: {}'.format(data)) data_object = json.loads(data) if 'conversation' not in data_object: msg = 'Cannot extract conversation id from Pub/Sub request.' logging.warning('Warning: {}'.format(msg)) return True conversation_name = data_object['conversation'] conversation_name = get_conversation_name_without_location(conversation_name) logging.debug('conversation_name: {0}, conversation_name_without_location: {1}'.format(data_object['conversation'], conversation_name)) # Emits messages to redis pub/sub msg_data = {'conversation_name': conversation_name, 'data': data, 'data_type': data_type, # The timestamp when the server receives this message. # It could help with analyzing service time of each part of the backend # infrastructure, which can be used for load testing. 'ack_time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'), 'publish_time': pubsub_message['publishTime'], 'message_id': pubsub_message['messageId']} server_id = '1' if data_type == 'new-recognition-result-notification-event': msg_data['participant_role'] = participant_role msg_data['new_recognition_result_message_id'] = new_recognition_result_message_id logging.debug('participant role {0} message id {1} for new recognition result'.format( participant_role, new_recognition_result_message_id)) if redis_client.exists(conversation_name) == 0: logging.warning( "No SERVER_ID (UI Connector instance) for conversation name {}. Please subscribe to the conversation by sending join-conversation event.".format(conversation_name)) return True else: server_id = redis_client.get(conversation_name).decode('utf-8') channel = '{}:{}'.format(server_id, conversation_name) redis_client.publish(channel, json.dumps(msg_data)) logging.debug( 'Redis publish (message_id: {0}, publish_time: {1}, conversation_name: {2}, channel: {3}, data_type: {4}.'.format( pubsub_message['messageId'], pubsub_message['publishTime'], conversation_name, channel, data_type)) return True @app.route('/human-agent-assistant-event', methods=['POST']) def subscribe_suggestions(): """Receives new human agent assist events from pre-configured dialogflow Pub/Sub topic.""" if not cloud_pubsub_handler(request, 'human-agent-assistant-event'): return f'Bad Request', 400 return ('Received by cloud run service as a HumanAgentAssistantEvent.', 204) @app.route('/conversation-lifecycle-event', methods=['POST']) def subscribe_lifecycle_events(): """Receives new conversation events from pre-configured dialogflow Pub/Sub topic.""" if not cloud_pubsub_handler(request, 'conversation-lifecycle-event'): return f'Bad Request', 400 return ('Received by cloud run service as a ConversationEvent.', 204) @app.route('/new-message-event', methods=['POST']) def subscribe_message_events(): """Receives new message events from pre-configured dialogflow Pub/Sub topic.""" if not cloud_pubsub_handler(request, 'new-message-event'): return f'Bad Request', 400 return ('Received by cloud run service as a ConversationEvent.', 204) @app.route('/new-recognition-result-notification-event', methods=['POST']) def subscribe_new_recognition_result_events(): """Receives new recognition result events from pre-configured dialogflow Pub/Sub topic.""" if not cloud_pubsub_handler(request, 'new-recognition-result-notification-event'): return f'Bad Request', 400 return ('Received by cloud run service as a New recognition result notification.', 204) if __name__ == '__main__': PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8080 # This is used when running locally. Gunicorn is used to run the # application on Cloud Run. See entrypoint in Dockerfile. app.run(host='127.0.0.1', port=PORT, debug=True)