def new_cert_pub_sub()

in rpi-image-builder/aws-iot-fleet-provisioning/provisioning_handler.py [0:0]


    def new_cert_pub_sub(self):
        """Method testing a call to the 'openworld' topic (which was specified in the policy for the new certificate)
        """

        new_cert_topic = "openworld"
        print("Subscribing to topic '{}'...".format(new_cert_topic))
        mqtt_topic_subscribe_future, _ = self.test_MQTTClient.subscribe(
            topic=new_cert_topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=self.basic_callback)

        # Wait for subscription to succeed
        mqtt_topic_subscribe_result = mqtt_topic_subscribe_future.result()
        print("Subscribed with {}".format(str(mqtt_topic_subscribe_result['qos'])))

        self.test_MQTTClient.publish(
            topic="openworld",
            payload=json.dumps({"service_response": "##### RESPONSE FROM PREVIOUSLY FORBIDDEN TOPIC #####"}),
            qos=mqtt.QoS.AT_LEAST_ONCE)