def _subscribe_operation()

in awsiot/__init__.py [0:0]


    def _subscribe_operation(self,
                             topic: str,
                             qos: int,
                             callback: Callable[[T], None],
                             payload_to_class_fn: PayloadToClassFn) -> Tuple[Future, str]:
        """
        Performs a 'Subscribe' style operation for an MQTT service.
        Messages received from this topic are processed as JSON,
        converted to the desired class by `payload_to_class_fn`,
        then passed to `callback`.

        Parameters:
        topic - The topic to subscribe to.
        qos   - The Quality of Service guarantee of this message
        callback - The callback to invoke when a message is received.
                The callback should take one argument of the type
                returned by payload_to_class_fn. The callback
                is not expected to return a value.
        payload_to_class_fn - A function which takes one argument,
                a dict, and returns a class of the type expected by
                `callback`. The dict comes from parsing the received
                message as JSON.

        Returns two values. The first is a `Future` whose result will be the
        `awscrt.mqtt.QoS` granted by the server, or an exception if the
        subscription fails. The second value is a topic which may be passed to
        `unsubscribe()` to stop receiving messages.
        Note that messages may arrive before the subscription is acknowledged.
        """

        future = Future()  # type: Future
        try:
            def on_suback(suback_future):
                try:
                    suback_result = suback_future.result()
                    future.set_result(suback_result['qos'])
                except Exception as e:
                    future.set_exception(e)

            def callback_wrapper(topic, payload, dup, qos, retain, **kwargs):
                try:
                    payload_obj = json.loads(payload.decode())
                    event = payload_to_class_fn(payload_obj)
                except BaseException:
                    # can't deliver payload, invoke callback with None
                    event = None
                callback(event)

            sub_future, _ = self.mqtt_connection.subscribe(
                topic=topic,
                qos=qos,
                callback=callback_wrapper,
            )
            sub_future.add_done_callback(on_suback)

        except Exception as e:
            future.set_exception(e)

        return future, topic