in aa-integration-backend/cloud-pubsub-interceptor/main.py [0:0]
def cloud_pubsub_handler(request, data_type):
"""Verifies and checks requests from Cloud Pub/Sub."""
envelope = request.get_json()
if not envelope:
msg = 'No Pub/Sub message received.'
logging.warning('Warning: {}'.format(msg))
return True
if not isinstance(envelope, dict) or 'message' not in envelope:
msg = 'Invalid Pub/Sub message format.'
logging.warning('Warning: {}'.format(msg))
return True
pubsub_message = envelope['message']
participant_role = ""
new_recognition_result_message_id = ''
if not isinstance(pubsub_message, dict):
msg = 'Invalid Pub/Sub message, message inside the envelope should be valid JSON format https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage.'
logging.warning('Warning: {}'.format(msg))
return True
if 'attributes' in pubsub_message:
attributes = pubsub_message['attributes']
if attributes is not None and isinstance(attributes, dict):
if "participant_role" in attributes:
participant_role = attributes['participant_role']
if "message_id" in attributes:
new_recognition_result_message_id = attributes['message_id']
if 'data' in pubsub_message:
data = base64.b64decode(pubsub_message['data']).decode('utf-8')
logging.debug('Subscribed Pub/Sub message: {}'.format(data))
data_object = json.loads(data)
if 'conversation' not in data_object:
msg = 'Cannot extract conversation id from Pub/Sub request.'
logging.warning('Warning: {}'.format(msg))
return True
conversation_name = data_object['conversation']
conversation_name = get_conversation_name_without_location(conversation_name)
logging.debug('conversation_name: {0}, conversation_name_without_location: {1}'.format(data_object['conversation'], conversation_name))
# Emits messages to redis pub/sub
msg_data = {'conversation_name': conversation_name,
'data': data,
'data_type': data_type,
# The timestamp when the server receives this message.
# It could help with analyzing service time of each part of the backend
# infrastructure, which can be used for load testing.
'ack_time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
'publish_time': pubsub_message['publishTime'],
'message_id': pubsub_message['messageId']}
server_id = '1'
if data_type == 'new-recognition-result-notification-event':
msg_data['participant_role'] = participant_role
msg_data['new_recognition_result_message_id'] = new_recognition_result_message_id
logging.debug('participant role {0} message id {1} for new recognition result'.format(
participant_role, new_recognition_result_message_id))
if redis_client.exists(conversation_name) == 0:
logging.warning(
"No SERVER_ID (UI Connector instance) for conversation name {}. Please subscribe to the conversation by sending join-conversation event.".format(conversation_name))
return True
else:
server_id = redis_client.get(conversation_name).decode('utf-8')
channel = '{}:{}'.format(server_id, conversation_name)
redis_client.publish(channel, json.dumps(msg_data))
logging.debug(
'Redis publish (message_id: {0}, publish_time: {1}, conversation_name: {2}, channel: {3}, data_type: {4}.'.format(
pubsub_message['messageId'], pubsub_message['publishTime'], conversation_name, channel, data_type))
return True