rust/azure_iot_operations_mqtt/src/session/wrapper.rs (176 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; use crate::MqttConnectionSettings; use crate::control_packet::{ Publish, PublishProperties, QoS, SubscribeProperties, UnsubscribeProperties, }; use crate::error::{PublishError, SubscribeError, UnsubscribeError}; use crate::interface::{AckToken, CompletionToken, ManagedClient, MqttPubSub, PubReceiver}; use crate::rumqttc_adapter as adapter; use crate::session::managed_client; use crate::session::reconnect_policy::{ExponentialBackoffWithJitter, ReconnectPolicy}; use crate::session::session; use crate::session::{SessionConfigError, SessionError, SessionExitError}; use crate::topic::TopicParseError; /// Client that manages connections over a single MQTT session. /// /// Use this centrally in an application to control the session and to create /// instances of [`SessionManagedClient`] and [`SessionExitHandle`]. pub struct Session(session::Session<adapter::ClientAlias, adapter::EventLoopAlias>); /// Handle used to end an MQTT session. /// /// PLEASE NOTE WELL /// This struct's API is designed around negotiating a graceful exit with the MQTT broker. /// However, this is not actually possible right now due to a bug in underlying MQTT library. #[derive(Clone)] pub struct SessionExitHandle(session::SessionExitHandle<adapter::ClientAlias>); /// Monitor for connection changes in the [`Session`]. /// /// This is largely for informational purposes. #[derive(Clone)] pub struct SessionConnectionMonitor(session::SessionConnectionMonitor); /// An MQTT client that has it's connection state externally managed by a [`Session`]. /// Can be used to send messages and create receivers for incoming messages. #[derive(Clone)] pub struct SessionManagedClient(managed_client::SessionManagedClient<adapter::ClientAlias>); /// Receive and acknowledge incoming MQTT messages. pub struct SessionPubReceiver(managed_client::SessionPubReceiver); /// Options for configuring a new [`Session`] #[derive(Builder)] #[builder(pattern = "owned")] pub struct SessionOptions { /// MQTT Connection Settings for configuring the [`Session`] pub connection_settings: MqttConnectionSettings, /// Reconnect Policy to by used by the `Session` #[builder(default = "Box::new(ExponentialBackoffWithJitter::default())")] pub reconnect_policy: Box<dyn ReconnectPolicy>, /// Maximum number of queued outgoing messages not yet accepted by the MQTT Session #[builder(default = "100")] pub outgoing_max: usize, /// Indicates if the Session should use features specific for use with the AIO MQTT Broker #[builder(default = "true")] pub aio_broker_features: bool, } impl Session { /// Create a new [`Session`] with the provided options structure. /// /// # Errors /// Returns a [`SessionConfigError`] if there are errors using the session options. pub fn new(options: SessionOptions) -> Result<Self, SessionConfigError> { let client_id = options.connection_settings.client_id.clone(); let sat_file = options.connection_settings.sat_file.clone(); // Add AIO metric to user properties when using AIO MQTT broker features // TODO: consider user properties from being supported on SessionOptions or ConnectionSettings let user_properties = if options.aio_broker_features { vec![("metriccategory".into(), "aiosdk-rust".into())] } else { vec![] }; let (client, event_loop) = adapter::client( options.connection_settings, options.outgoing_max, true, user_properties, )?; Ok(Session(session::Session::new_from_injection( client, event_loop, options.reconnect_policy, client_id, sat_file, ))) } /// Return a new instance of [`SessionExitHandle`] that can be used to end this [`Session`] pub fn create_exit_handle(&self) -> SessionExitHandle { SessionExitHandle(self.0.create_exit_handle()) } /// Return a new instance of [`SessionConnectionMonitor`] that can be used to monitor the connection state pub fn create_connection_monitor(&self) -> SessionConnectionMonitor { SessionConnectionMonitor(self.0.create_connection_monitor()) } /// Return a new instance of [`SessionManagedClient`] that can be used to send and receive messages pub fn create_managed_client(&self) -> SessionManagedClient { SessionManagedClient(self.0.create_managed_client()) } /// Begin running the [`Session`]. /// /// Blocks until either a session exit or a fatal connection error is encountered. /// /// # Errors /// Returns a [`SessionError`] if the session encounters a fatal error and ends. pub async fn run(self) -> Result<(), SessionError> { self.0.run().await } } impl ManagedClient for SessionManagedClient { type PubReceiver = SessionPubReceiver; fn client_id(&self) -> &str { self.0.client_id() } fn create_filtered_pub_receiver( &self, topic_filter: &str, ) -> Result<SessionPubReceiver, TopicParseError> { Ok(SessionPubReceiver( self.0.create_filtered_pub_receiver(topic_filter)?, )) } fn create_unfiltered_pub_receiver(&self) -> SessionPubReceiver { SessionPubReceiver(self.0.create_unfiltered_pub_receiver()) } } #[async_trait] impl MqttPubSub for SessionManagedClient { async fn publish( &self, topic: impl Into<String> + Send, qos: QoS, retain: bool, payload: impl Into<Bytes> + Send, ) -> Result<CompletionToken, PublishError> { self.0.publish(topic, qos, retain, payload).await } async fn publish_with_properties( &self, topic: impl Into<String> + Send, qos: QoS, retain: bool, payload: impl Into<Bytes> + Send, properties: PublishProperties, ) -> Result<CompletionToken, PublishError> { self.0 .publish_with_properties(topic, qos, retain, payload, properties) .await } async fn subscribe( &self, topic: impl Into<String> + Send, qos: QoS, ) -> Result<CompletionToken, SubscribeError> { self.0.subscribe(topic, qos).await } async fn subscribe_with_properties( &self, topic: impl Into<String> + Send, qos: QoS, properties: SubscribeProperties, ) -> Result<CompletionToken, SubscribeError> { self.0 .subscribe_with_properties(topic, qos, properties) .await } async fn unsubscribe( &self, topic: impl Into<String> + Send, ) -> Result<CompletionToken, UnsubscribeError> { self.0.unsubscribe(topic).await } async fn unsubscribe_with_properties( &self, topic: impl Into<String> + Send, properties: UnsubscribeProperties, ) -> Result<CompletionToken, UnsubscribeError> { self.0.unsubscribe_with_properties(topic, properties).await } } #[async_trait] impl PubReceiver for SessionPubReceiver { async fn recv(&mut self) -> Option<Publish> { self.0.recv().await } async fn recv_manual_ack(&mut self) -> Option<(Publish, Option<AckToken>)> { self.0.recv_manual_ack().await } fn close(&mut self) { self.0.close(); } } impl SessionExitHandle { /// Attempt to gracefully end the MQTT session running in the [`Session`] that created this handle. /// This will cause the [`Session::run()`] method to return. /// /// Note that a graceful exit requires the [`Session`] to be connected to the broker. /// If the [`Session`] is not connected, this method will return an error. /// If the [`Session`] connection has been recently lost, the [`Session`] may not yet realize this, /// and it can take until up to the keep-alive interval for the [`Session`] to realize it is disconnected, /// after which point this method will return an error. Under this circumstance, the attempt was still made, /// and may eventually succeed even if this method returns the error /// /// # Errors /// * [`SessionExitError`] of kind [`SessionExitErrorKind::Detached`](crate::session::SessionExitErrorKind) if the Session no longer exists. /// * [`SessionExitError`] of kind [`SessionExitErrorKind::BrokerUnavailable`](crate::session::SessionExitErrorKind) if the Session is not connected to the broker. pub async fn try_exit(&self) -> Result<(), SessionExitError> { self.0.try_exit().await } /// Attempt to gracefully end the MQTT session running in the [`Session`] that created this handle. /// This will cause the [`Session::run()`] method to return. /// /// Note that a graceful exit requires the [`Session`] to be connected to the broker. /// If the [`Session`] is not connected, this method will return an error. /// If the [`Session`] connection has been recently lost, the [`Session`] may not yet realize this, /// and it can take until up to the keep-alive interval for the [`Session`] to realize it is disconnected, /// after which point this method will return an error. Under this circumstance, the attempt was still made, /// and may eventually succeed even if this method returns the error /// If the graceful [`Session`] exit attempt does not complete within the specified timeout, this method /// will return an error. /// /// # Arguments /// * `timeout` - The duration to wait for the graceful exit to complete before returning an error. /// /// # Errors /// * [`SessionExitError`] of kind [`SessionExitErrorKind::Detached`](crate::session::SessionExitErrorKind) if the Session no longer exists. /// * [`SessionExitError`] of kind [`SessionExitErrorKind::BrokerUnavailable`](crate::session::SessionExitErrorKind) if the Session is not connected to the broker within the specified timeout interval. pub async fn try_exit_timeout(&self, timeout: Duration) -> Result<(), SessionExitError> { self.0.try_exit_timeout(timeout).await } /// Forcefully end the MQTT session running in the [`Session`] that created this handle. /// This will cause the [`Session::run()`] method to return. /// /// The [`Session`] will be granted a period of 1 second to attempt a graceful exit before /// forcing the exit. If the exit is forced, the broker will not be aware the MQTT session /// has ended. /// /// Returns true if the exit was graceful, and false if the exit was forced. pub async fn exit_force(&self) -> bool { self.0.exit_force().await } } impl SessionConnectionMonitor { /// Returns true if the [`Session`] is currently connected. /// Note that this may not be accurate if connection has been recently lost. #[must_use] pub fn is_connected(&self) -> bool { self.0.is_connected() } /// Wait until the [`Session`] is connected. /// Returns immediately if already connected. pub async fn connected(&self) { self.0.connected().await; } /// Wait until the [`Session`] is disconnected. /// Returns immediately if already disconnected. pub async fn disconnected(&self) { self.0.disconnected().await; } }