in awsiot/__init__.py [0:0]
def _subscribe_operation(self,
topic: str,
qos: int,
callback: Callable[[T], None],
payload_to_class_fn: PayloadToClassFn) -> Tuple[Future, str]:
"""
Performs a 'Subscribe' style operation for an MQTT service.
Messages received from this topic are processed as JSON,
converted to the desired class by `payload_to_class_fn`,
then passed to `callback`.
Parameters:
topic - The topic to subscribe to.
qos - The Quality of Service guarantee of this message
callback - The callback to invoke when a message is received.
The callback should take one argument of the type
returned by payload_to_class_fn. The callback
is not expected to return a value.
payload_to_class_fn - A function which takes one argument,
a dict, and returns a class of the type expected by
`callback`. The dict comes from parsing the received
message as JSON.
Returns two values. The first is a `Future` whose result will be the
`awscrt.mqtt.QoS` granted by the server, or an exception if the
subscription fails. The second value is a topic which may be passed to
`unsubscribe()` to stop receiving messages.
Note that messages may arrive before the subscription is acknowledged.
"""
future = Future() # type: Future
try:
def on_suback(suback_future):
try:
suback_result = suback_future.result()
future.set_result(suback_result['qos'])
except Exception as e:
future.set_exception(e)
def callback_wrapper(topic, payload, dup, qos, retain, **kwargs):
try:
payload_obj = json.loads(payload.decode())
event = payload_to_class_fn(payload_obj)
except BaseException:
# can't deliver payload, invoke callback with None
event = None
callback(event)
sub_future, _ = self.mqtt_connection.subscribe(
topic=topic,
qos=qos,
callback=callback_wrapper,
)
sub_future.add_done_callback(on_suback)
except Exception as e:
future.set_exception(e)
return future, topic