in rpi-image-builder/aws-iot-fleet-provisioning/provisioning_handler.py [0:0]
def enable_error_monitor(self):
""" Subscribe to pertinent IoTCore topics that would emit errors
"""
template_reject_topic = "$aws/provisioning-templates/{}/provision/json/rejected".format(self.template_name)
certificate_reject_topic = "$aws/certificates/create/json/rejected"
template_accepted_topic = "$aws/provisioning-templates/{}/provision/json/accepted".format(self.template_name)
certificate_accepted_topic = "$aws/certificates/create/json/accepted"
subscribe_topics = [template_reject_topic, certificate_reject_topic, template_accepted_topic, certificate_accepted_topic]
for mqtt_topic in subscribe_topics:
print("Subscribing to topic '{}'...".format(mqtt_topic))
mqtt_topic_subscribe_future, _ = self.primary_MQTTClient.subscribe(
topic=mqtt_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.basic_callback)
# Wait for subscription to succeed
mqtt_topic_subscribe_result = mqtt_topic_subscribe_future.result()
print("Subscribed with {}".format(str(mqtt_topic_subscribe_result['qos'])))