in awscrt/mqtt.py [0:0]
def subscribe(self, topic, qos, callback=None):
"""Subscribe to a topic filter (async).
The client sends a SUBSCRIBE packet and the server responds with a SUBACK.
subscribe() may be called while the device is offline, though the async
operation cannot complete successfully until the connection resumes.
Once subscribed, `callback` is invoked each time a message matching
the `topic` is received. It is possible for such messages to arrive before
the SUBACK is received.
Args:
topic (str): Subscribe to this topic filter, which may include wildcards.
qos (QoS): Maximum requested QoS that server may use when sending messages to the client.
The server may grant a lower QoS in the SUBACK (see returned Future)
callback: Optional callback invoked when message received.
Function should take the following arguments and return nothing:
* `topic` (str): Topic receiving message.
* `payload` (bytes): Payload of message.
* `dup` (bool): DUP flag. If True, this might be re-delivery
of an earlier attempt to send the message.
* `qos` (:class:`QoS`): Quality of Service used to deliver the message.
* `retain` (bool): Retain flag. If True, the message was sent
as a result of a new subscription being made by the client.
* `**kwargs` (dict): Forward-compatibility kwargs.
Returns:
Tuple[concurrent.futures.Future, int]: Tuple containing a Future and
the ID of the SUBSCRIBE packet. The Future completes when a
SUBACK is received from the server. If successful, the Future will
contain a dict with the following members:
* ['packet_id'] (int): ID of the SUBSCRIBE packet being acknowledged.
* ['topic'] (str): Topic filter of the SUBSCRIBE packet being acknowledged.
* ['qos'] (:class:`QoS`): Maximum QoS that was granted by the server.
This may be lower than the requested QoS.
If unsuccessful, the Future contains an exception. The exception
will be a :class:`SubscribeError` if a SUBACK was received
in which the server rejected the subscription. Other exception
types indicate other errors with the operation.
"""
future = Future()
packet_id = 0
if callback:
def callback_wrapper(topic, payload, dup, qos, retain):
try:
callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
except TypeError:
# This callback used to have fewer args.
# Try again, passing only those those args, to cover case where
# user function failed to take forward-compatibility **kwargs.
callback(topic=topic, payload=payload)
else:
callback_wrapper = None
def suback(packet_id, topic, qos, error_code):
if error_code:
future.set_exception(awscrt.exceptions.from_code(error_code))
else:
qos = _try_qos(qos)
if qos is None:
future.set_exception(SubscribeError(topic))
else:
future.set_result(dict(
packet_id=packet_id,
topic=topic,
qos=qos,
))
try:
assert callable(callback) or callback is None
assert isinstance(qos, QoS)
packet_id = _awscrt.mqtt_client_connection_subscribe(
self._binding, topic, qos.value, callback_wrapper, suback)
except Exception as e:
future.set_exception(e)
return future, packet_id