rust/azure_iot_operations_mqtt/src/interface.rs (100 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. //! Traits and types for defining sets and subsets of MQTT client functionality. use async_trait::async_trait; use bytes::Bytes; use crate::control_packet::{ AuthProperties, Publish, PublishProperties, QoS, SubscribeProperties, UnsubscribeProperties, }; use crate::error::{ AckError, CompletionError, ConnectionError, DisconnectError, PublishError, ReauthError, SubscribeError, UnsubscribeError, }; pub use crate::session::receiver::AckToken; // TODO: remove this pub re-export after concretized receivers / managed clients use crate::topic::TopicParseError; // ---------- Concrete Types ---------- /// Awaitable token indicating completion of MQTT message delivery. pub struct CompletionToken( pub Box<dyn std::future::Future<Output = Result<(), CompletionError>> + Send>, ); impl std::future::Future for CompletionToken { type Output = Result<(), CompletionError>; fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { // NOTE: Need to use `unsafe` here because we need to poll the inner future, but can't get // a mutable reference to it, as it's in a box (at least, not without unsafe code). // It is safe for us to use the `unsafe` code for the following reasons: // // 1. This CompletionToken struct is the only reference to the boxed future that it holds // internally, so mutability among multiple references is not a concern. Nowhere is this // struct used in a way that the inner future can be accessed from multiple threads, // and so we are safe from race conditions. // // 2. We do not move the memory - the future stays right where it is, all we do is poll it, // so there is no risk of a null pointer error / segfault. let inner = unsafe { self.map_unchecked_mut(|s| &mut *s.0) }; inner.poll(cx) } } // Re-export rumqttc types to avoid user code taking the dependency. // TODO: Re-implement these instead of just aliasing / add to rumqttc adapter // Only once there are non-rumqttc implementations of these can we allow non-rumqttc compilations /// Event yielded by the event loop pub type Event = rumqttc::v5::Event; /// Incoming data on the event loop pub type Incoming = rumqttc::v5::Incoming; /// Outgoing data on the event loop pub type Outgoing = rumqttc::Outgoing; // ---------- Lower level MQTT abstractions ---------- /// MQTT publish, subscribe and unsubscribe functionality #[async_trait] pub trait MqttPubSub { /// MQTT Publish /// /// If connection is unavailable, publish will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn publish( &self, topic: impl Into<String> + Send, qos: QoS, retain: bool, payload: impl Into<Bytes> + Send, ) -> Result<CompletionToken, PublishError>; /// MQTT Publish /// /// If connection is unavailable, publish will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn publish_with_properties( &self, topic: impl Into<String> + Send, qos: QoS, retain: bool, payload: impl Into<Bytes> + Send, properties: PublishProperties, ) -> Result<CompletionToken, PublishError>; /// MQTT Subscribe /// /// If connection is unavailable, subscribe will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn subscribe( &self, topic: impl Into<String> + Send, qos: QoS, ) -> Result<CompletionToken, SubscribeError>; /// MQTT Subscribe /// /// If connection is unavailable, subscribe will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn subscribe_with_properties( &self, topic: impl Into<String> + Send, qos: QoS, properties: SubscribeProperties, ) -> Result<CompletionToken, SubscribeError>; /// MQTT Unsubscribe /// /// If connection is unavailable, unsubscribe will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn unsubscribe( &self, topic: impl Into<String> + Send, ) -> Result<CompletionToken, UnsubscribeError>; /// MQTT Unsubscribe /// /// If connection is unavailable, unsubscribe will be queued and delivered when connection is re-established. /// Blocks if at capacity for queueing. async fn unsubscribe_with_properties( &self, topic: impl Into<String> + Send, properties: UnsubscribeProperties, ) -> Result<CompletionToken, UnsubscribeError>; } /// Provides functionality for acknowledging a received Publish message (QoS 1) #[async_trait] pub trait MqttAck { /// Acknowledge a received Publish. async fn ack(&self, publish: &Publish) -> Result<CompletionToken, AckError>; } // TODO: consider scoping this to also include a `connect`. Not currently needed, but would be more flexible, // and make a lot more sense /// MQTT disconnect functionality #[async_trait] pub trait MqttDisconnect { /// Disconnect from the MQTT broker. async fn disconnect(&self) -> Result<(), DisconnectError>; } /// Internally-facing APIs for the underlying client. /// Use of this trait is not currently recommended except for mocking. #[async_trait] pub trait MqttClient: MqttPubSub + MqttAck + MqttDisconnect { /// Reauthenticate with the MQTT broker async fn reauth(&self, auth_props: AuthProperties) -> Result<(), ReauthError>; } /// MQTT Event Loop manipulation #[async_trait] pub trait MqttEventLoop { /// Poll the event loop for the next [`Event`] async fn poll(&mut self) -> Result<Event, ConnectionError>; /// Modify the clean start flag for subsequent MQTT connection attempts fn set_clean_start(&mut self, clean_start: bool); /// Set the authentication method fn set_authentication_method(&mut self, authentication_method: Option<String>); /// Set the authentication data fn set_authentication_data(&mut self, authentication_data: Option<Bytes>); } // ---------- Higher level MQTT abstractions ---------- /// An MQTT client that has it's connection state externally managed. /// Can be used to send messages and create receivers for incoming messages. pub trait ManagedClient: MqttPubSub { /// The type of receiver used by this client type PubReceiver: PubReceiver; /// Get the client id for the MQTT connection fn client_id(&self) -> &str; /// Creates a new [`PubReceiver`] that receives messages on a specific topic /// /// # Errors /// Returns a [`TopicParseError`] if the pub receiver cannot be registered. fn create_filtered_pub_receiver( &self, topic_filter: &str, ) -> Result<Self::PubReceiver, TopicParseError>; /// Creates a new [`PubReceiver`] that receives all messages not sent to other /// filtered receivers. fn create_unfiltered_pub_receiver(&self) -> Self::PubReceiver; } #[async_trait] /// Receiver for incoming MQTT messages. pub trait PubReceiver { /// Receives the next incoming publish. /// /// Return None if there will be no more incoming publishes. async fn recv(&mut self) -> Option<Publish>; /// Receives the next incoming publish, and a token that can be used to manually acknowledge /// the publish (Quality of Service 1 or 2), or `None` (Quality of Service 0). /// /// Return None if there will be no more incoming publishes. async fn recv_manual_ack(&mut self) -> Option<(Publish, Option<AckToken>)>; /// Close the receiver, preventing further incoming publishes. /// /// To guarantee no publish loss, `recv()`/`recv_manual_ack()` must be called until `None` is returned. fn close(&mut self); }