in rpi-image-builder/aws-iot-fleet-provisioning/provisioning_handler.py [0:0]
def new_cert_pub_sub(self):
"""Method testing a call to the 'openworld' topic (which was specified in the policy for the new certificate)
"""
new_cert_topic = "openworld"
print("Subscribing to topic '{}'...".format(new_cert_topic))
mqtt_topic_subscribe_future, _ = self.test_MQTTClient.subscribe(
topic=new_cert_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.basic_callback)
# Wait for subscription to succeed
mqtt_topic_subscribe_result = mqtt_topic_subscribe_future.result()
print("Subscribed with {}".format(str(mqtt_topic_subscribe_result['qos'])))
self.test_MQTTClient.publish(
topic="openworld",
payload=json.dumps({"service_response": "##### RESPONSE FROM PREVIOUSLY FORBIDDEN TOPIC #####"}),
qos=mqtt.QoS.AT_LEAST_ONCE)