in src/pubsub/mqtt_pubsub.py [0:0]
def publish_to_mqtt(self, topic, message):
'''
Publish a Python object serialised as a JSON message to the IoT Core MQTT topic.
'''
try:
log.debug('MQTT PUBLISH: topic: {} - Message: {}'.format(topic, message))
self.mqtt_request.topic_name = topic
json_message = json.dumps(message)
self.mqtt_request.payload = bytes(json_message, "utf-8")
operation = self.mqtt_publish_client.new_publish_to_iot_core()
operation.activate(self.mqtt_request)
future = operation.get_response()
future.result(self.mqtt_timeout)
except KeyError as key_error: # pragma: no cover # includes requests for fields that don't exixt in the received object
log.exception('KEY_ERROR: KeyError occurred while publishing to IoT Core on MQTT Topic. ERROR MESSAGE: {} - TOPIC: {} - MESSAGE: {}'.format(key_error, topic, message))
except concurrent.futures.TimeoutError as timeout_error: # pragma: no cover
log.exception('TIMEOUT_ERROR: Timeout occurred while publishing to IoT Core on MQTT Topic. ERROR MESSAGE: {} - TOPIC: {} - MESSAGE: {}'.format(timeout_error, topic, message))
except UnauthorizedError as unauth_error: # pragma: no cover
log.exception('UNAUTHORIZED_ERROR: Unauthorized error while publishing to IoT Core on MQTT Topic. ERROR MESSAGE: {} - TOPIC: {} - MESSAGE: {}'.format(unauth_error, topic, message))
except Exception as err: # pragma: no cover
log.exception('EXCEPTION: Exception while publishing to IoT Core on MQTT Topic. ERROR MESSAGE: {} - TOPIC: {} - MESSAGE: {}'.format(err, topic, message))