in src/main.py [0:0]
def pubsub_message_callback(self, topic, payload):
'''
Single Callback for all (IPC and MQTT) PubSub messages.
Provides initial message validation and passing to PubSub topic routers.
Expects message payload provided is JSON that can be Serialized and
Deserialized to a programmatic language specific object.
'''
try:
# Log incomming message
log.info('Received PubSub Message. Topic: {} - Message: {}'.format(topic, payload))
# Note: Expects all ingress messages to be in JSON format
message = json.loads(payload)
########################################################
#### Common Message Parameter Validation ####
# Parse AWS Greengrass Component SDK (mandatory) fields for initial validation.
message_id = message['message-id']
reqres = message['reqres']
command = message['command']
########################################################
#### ReqRes Parameter Validation ####
# Validate RESPONSE message status code (exists and value) before processing message further.
if reqres == 'response' or reqres == 'update':
status = message['response']['status']
if status < 200 or status > 299:
raise Exception('PubSub RESPONSE received with status: {}'.format(status))
# Validate REQUEST message required parameters exist before message further.
elif reqres == 'request':
reply_topic = message['reply-topic']
reply_sdk = message['reply-sdk']
# Raise exception if not recognised ReqRes message type
else:
raise Exception('Message PubSub received with unknown ReqRes type: {}'.format(reqres))
########################################################
#### Message Routing ####
## Route the AWS Greengrass Component SDK prescribed IPC / MQTT PubSub topics.
# Route IPC service topic message.
if topic == self.ipc_service_topic:
self.ipc_service_topic_router(message_id, reqres, command, message)
# Route MQTT service topic message
elif topic == self.mqtt_service_topic:
self.mqtt_service_topic_router(message_id, reqres, command, message)
# Route IPC and MQTT broadcast topics to single router
elif topic == self.ipc_broadcast_topic or topic == self.mqtt_broadcast_topic:
self.application_broadcast_topic_router(message_id, reqres, command, message)
## Route custom topics that this component subscribes too.
# As needed for specific component / application
else:
raise Exception('PubSub message received on unknown / unhandled topic.')
except ValueError as val_error: # includes JSON parsing errors
log.error('VAL_ERROR: JSON Parsing Error / Unexpected PubSub message format received. ERROR MESSAGE: {} - TOPIC: {} - PAYLOAD: {}'.format(val_error, topic, payload))
except KeyError as key_error: # pragma: no cover # includes requests for fields that don't exist in the received object
log.error('KEY_ERROR: Received PubSub message missing required fields. ERROR MESSAGE: {} - TOPIC: {} - PAYLOAD: {}'.format(key_error, topic, payload))
except Exception as err:
log.error('EXCEPTION: Exception raised from PubSub message received. ERROR MESSAGE: {} - TOPIC: {} - PAYLOAD: {}'.format(err, topic, payload))