def subscribe()

in awscrt/mqtt.py [0:0]


    def subscribe(self, topic, qos, callback=None):
        """Subscribe to a topic filter (async).

        The client sends a SUBSCRIBE packet and the server responds with a SUBACK.

        subscribe() may be called while the device is offline, though the async
        operation cannot complete successfully until the connection resumes.

        Once subscribed, `callback` is invoked each time a message matching
        the `topic` is received. It is possible for such messages to arrive before
        the SUBACK is received.

        Args:
            topic (str): Subscribe to this topic filter, which may include wildcards.
            qos (QoS): Maximum requested QoS that server may use when sending messages to the client.
                The server may grant a lower QoS in the SUBACK (see returned Future)
            callback: Optional callback invoked when message received.
                Function should take the following arguments and return nothing:

                    *   `topic` (str): Topic receiving message.

                    *   `payload` (bytes): Payload of message.

                    *   `dup` (bool): DUP flag. If True, this might be re-delivery
                        of an earlier attempt to send the message.

                    *   `qos` (:class:`QoS`): Quality of Service used to deliver the message.

                    *   `retain` (bool): Retain flag. If True, the message was sent
                        as a result of a new subscription being made by the client.

                    *   `**kwargs` (dict): Forward-compatibility kwargs.

        Returns:
            Tuple[concurrent.futures.Future, int]: Tuple containing a Future and
            the ID of the SUBSCRIBE packet. The Future completes when a
            SUBACK is received from the server. If successful, the Future will
            contain a dict with the following members:

                *   ['packet_id'] (int): ID of the SUBSCRIBE packet being acknowledged.

                *   ['topic'] (str): Topic filter of the SUBSCRIBE packet being acknowledged.

                *   ['qos'] (:class:`QoS`): Maximum QoS that was granted by the server.
                    This may be lower than the requested QoS.

            If unsuccessful, the Future contains an exception. The exception
            will be a :class:`SubscribeError` if a SUBACK was received
            in which the server rejected the subscription. Other exception
            types indicate other errors with the operation.
        """

        future = Future()
        packet_id = 0

        if callback:
            def callback_wrapper(topic, payload, dup, qos, retain):
                try:
                    callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
                except TypeError:
                    # This callback used to have fewer args.
                    # Try again, passing only those those args, to cover case where
                    # user function failed to take forward-compatibility **kwargs.
                    callback(topic=topic, payload=payload)
        else:
            callback_wrapper = None

        def suback(packet_id, topic, qos, error_code):
            if error_code:
                future.set_exception(awscrt.exceptions.from_code(error_code))
            else:
                qos = _try_qos(qos)
                if qos is None:
                    future.set_exception(SubscribeError(topic))
                else:
                    future.set_result(dict(
                        packet_id=packet_id,
                        topic=topic,
                        qos=qos,
                    ))

        try:
            assert callable(callback) or callback is None
            assert isinstance(qos, QoS)
            packet_id = _awscrt.mqtt_client_connection_subscribe(
                self._binding, topic, qos.value, callback_wrapper, suback)
        except Exception as e:
            future.set_exception(e)

        return future, packet_id