rust/azure_iot_operations_mqtt/src/session.rs (66 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. //! MQTT client providing a managed connection with automatic reconnection across a single MQTT session. //! //! This module provides several key components for using an MQTT session: //! * [`Session`] - Manages the lifetime of the MQTT session //! * [`SessionManagedClient`] - Sends MQTT messages to the broker //! * [`SessionPubReceiver`] - Receives MQTT messages from the broker //! * [`SessionConnectionMonitor`] - Provides information about MQTT connection state //! * [`SessionExitHandle`] - Allows the user to exit the session gracefully //! //! # [`Session`] lifespan //! Each instance of [`Session`] is single use - after configuring a [`Session`], and creating any //! other necessary components from it, calling the [`run`](crate::session::Session::run) method //! will consume the [`Session`] and block (asynchronously) until the MQTT session shared between //! client and broker ends. Note that a MQTT session can span multiple connects and disconnects to //! the broker. //! //! The MQTT session can be ended one of three ways: //! 1. The MQTT broker ends the MQTT session //! 2. The [`ReconnectPolicy`](crate::session::reconnect_policy::ReconnectPolicy) configured on the //! [`Session`] halts reconnection attempts, causing the [`Session`] to end the MQTT session. //! 3. The user uses the [`SessionExitHandle`] to end the MQTT session. //! <div class="warning">The SessionExitHandle currently only causes the exit of the Session client //! not the end of the MQTT session shared with the broker. This limitation will be fixed in future //! updates.</div> //! //! # Sending and receiving data over MQTT //! A [`Session`] can be used to create a [`SessionManagedClient`] for sending data (i.e. outgoing //! MQTT PUBLISH, MQTT SUBSCRIBE, MQTT UNSUBSCRIBE), and can in turn be used to create a //! [`SessionPubReceiver`] for receiving incoming data (i.e. incoming MQTT PUBLISH). //! //! [`SessionPubReceiver`]s can be either filtered or unfiltered - a filtered receiver will only //! receive messages that match a specific topic filter, while an unfiltered receiver will receive //! all messages that do not match another existing filter. //! //! Note that in order to receive incoming data, you must both subscribe to the topic filter of //! interest using the [`SessionManagedClient`] and create a [`SessionPubReceiver`] (filtered or //! unfiltered). If an incoming message is received that //! does not match any [`SessionPubReceiver`]s, it will be acknowledged to the MQTT broker and //! discarded. Thus, in order to guarantee that messages will not be lost, you should create the //! [`SessionPubReceiver`] *before* subscribing to the topic filter. pub mod managed_client; // TODO: This really ought be private, but we need it public for testing pub(crate) mod receiver; pub mod reconnect_policy; #[doc(hidden)] #[allow(clippy::module_inception)] // This isn't ideal naming, but it'd be inconsistent otherwise. pub mod session; // TODO: Make this private and accessible via compile flags mod state; mod wrapper; use std::fmt; use thiserror::Error; use crate::auth::SatAuthContextInitError; use crate::error::{ConnectionError, DisconnectError}; use crate::rumqttc_adapter as adapter; pub use wrapper::*; /// Error describing why a [`Session`] ended prematurely #[derive(Debug, Error)] #[error(transparent)] pub struct SessionError(#[from] SessionErrorRepr); /// Internal error for [`Session`] runs. #[derive(Error, Debug)] enum SessionErrorRepr { /// MQTT session was lost due to a connection error. #[error("session state not present on broker after reconnect")] SessionLost, /// MQTT session was ended due to an unrecoverable connection error #[error(transparent)] ConnectionError(#[from] ConnectionError), /// Reconnect attempts were halted by the reconnect policy, ending the MQTT session #[error("reconnection halted by reconnect policy")] ReconnectHalted, /// The [`Session`] was ended by a user-initiated force exit. The broker may still retain the MQTT session. #[error("session ended by force exit")] ForceExit, /// The [`Session`] was ended by an IO error. #[error("{0}")] IoError(#[from] std::io::Error), /// The [`Session`] was ended by an error in the SAT auth context. #[error("{0}")] SatAuthError(#[from] SatAuthContextInitError), } /// Error configuring a [`Session`]. #[derive(Error, Debug)] #[error(transparent)] pub struct SessionConfigError(#[from] adapter::MqttAdapterError); /// Error type for exiting a [`Session`] using the [`SessionExitHandle`]. #[derive(Error, Debug)] #[error("{kind} (network attempt = {attempted})")] pub struct SessionExitError { attempted: bool, kind: SessionExitErrorKind, } impl SessionExitError { /// Return the corresponding [`SessionExitErrorKind`] for this error #[must_use] pub fn kind(&self) -> SessionExitErrorKind { self.kind } /// Return whether a network attempt was made before the error occurred #[must_use] pub fn attempted(&self) -> bool { self.attempted } } impl From<DisconnectError> for SessionExitError { fn from(_: DisconnectError) -> Self { Self { attempted: true, kind: SessionExitErrorKind::Detached, } } } /// An enumeration of categories of [`SessionExitError`] #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum SessionExitErrorKind { /// The exit handle was detached from the session Detached, /// The broker could not be reached BrokerUnavailable, } impl fmt::Display for SessionExitErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SessionExitErrorKind::Detached => { write!(f, "Detached from Session") } SessionExitErrorKind::BrokerUnavailable => write!(f, "Could not contact broker"), } } }