def cloud_pubsub_handler()

in aa-integration-backend/cloud-pubsub-interceptor/main.py [0:0]


def cloud_pubsub_handler(request, data_type):
    """Verifies and checks requests from Cloud Pub/Sub."""
    envelope = request.get_json()
    if not envelope:
        msg = 'No Pub/Sub message received.'
        logging.warning('Warning: {}'.format(msg))
        return True

    if not isinstance(envelope, dict) or 'message' not in envelope:
        msg = 'Invalid Pub/Sub message format.'
        logging.warning('Warning: {}'.format(msg))
        return True

    pubsub_message = envelope['message']
    participant_role = ""
    new_recognition_result_message_id = ''

    if not isinstance(pubsub_message, dict):
        msg = 'Invalid Pub/Sub message, message inside the envelope should be valid JSON format https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage.'
        logging.warning('Warning: {}'.format(msg))
        return True

    if 'attributes' in pubsub_message:
        attributes = pubsub_message['attributes']
        if attributes is not None and isinstance(attributes, dict):
            if "participant_role" in attributes:
                participant_role = attributes['participant_role']
            if "message_id" in attributes:
                new_recognition_result_message_id = attributes['message_id']

    if 'data' in pubsub_message:
        data = base64.b64decode(pubsub_message['data']).decode('utf-8')
        logging.debug('Subscribed Pub/Sub message: {}'.format(data))
        data_object = json.loads(data)
        if 'conversation' not in data_object:
            msg = 'Cannot extract conversation id from Pub/Sub request.'
            logging.warning('Warning: {}'.format(msg))
            return True

        conversation_name = data_object['conversation']
        conversation_name = get_conversation_name_without_location(conversation_name)
        logging.debug('conversation_name: {0}, conversation_name_without_location: {1}'.format(data_object['conversation'], conversation_name))

        # Emits messages to redis pub/sub
        msg_data = {'conversation_name': conversation_name,
                    'data': data,
                    'data_type': data_type,
                    # The timestamp when the server receives this message.
                    # It could help with analyzing service time of each part of the backend
                    # infrastructure, which can be used for load testing.
                    'ack_time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
                    'publish_time': pubsub_message['publishTime'],
                    'message_id': pubsub_message['messageId']}
        server_id = '1'
        if data_type == 'new-recognition-result-notification-event':
            msg_data['participant_role'] = participant_role
            msg_data['new_recognition_result_message_id'] = new_recognition_result_message_id
            logging.debug('participant role {0} message id {1} for new recognition result'.format(
                participant_role, new_recognition_result_message_id))
        if redis_client.exists(conversation_name) == 0:
            logging.warning(
                "No SERVER_ID (UI Connector instance) for conversation name {}. Please subscribe to the conversation by sending join-conversation event.".format(conversation_name))
            return True
        else:
            server_id = redis_client.get(conversation_name).decode('utf-8')
        channel = '{}:{}'.format(server_id, conversation_name)
        redis_client.publish(channel, json.dumps(msg_data))
        logging.debug(
            'Redis publish (message_id: {0}, publish_time: {1}, conversation_name: {2}, channel: {3}, data_type: {4}.'.format(
                pubsub_message['messageId'], pubsub_message['publishTime'], conversation_name, channel, data_type))
    return True