rust/azure_iot_operations_services/src/common.rs (54 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Contains common modules shared between the clients for the various services of Azure IoT Operations
// NOTE: submodules should be behind the feature flags of the clients that use them to ensure they
// are only compiled when necessary.
#[cfg(feature = "state_store")]
pub mod dispatcher {
//! Provides a convenience for dispatching to a receiver based on an ID.
use std::{collections::HashMap, sync::Mutex};
use thiserror::Error;
use tokio::sync::mpsc::{
UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel,
};
pub type Receiver<T> = UnboundedReceiver<T>;
/// Error when registering a new receiver
#[derive(Error, Debug)]
pub enum RegisterError {
#[error("receiver with id {0} already registered")]
AlreadyRegistered(String),
}
/// Error when dispatching a message to a receiver
#[derive(Error, Debug)]
pub enum DispatchError<T> {
/// Error when trying to send a message to a receiver
#[error(transparent)]
SendError(#[from] SendError<T>),
/// Error when trying to find a receiver by ID
#[error("receiver with id {0} not found")]
NotFound(String),
}
/// Dispatches messages to receivers based on ID
#[derive(Default)]
pub struct Dispatcher<T> {
tx_map: Mutex<HashMap<String, UnboundedSender<T>>>,
}
impl<T> Dispatcher<T> {
/// Returns a new instance of Dispatcher
pub fn new() -> Self {
Self {
tx_map: Mutex::new(HashMap::new()),
}
}
/// Registers a new receiver with the given ID, returning the new receiver.
///
/// Returns an error if a receiver with the same ID is already registered
pub fn register_receiver(&self, receiver_id: String) -> Result<Receiver<T>, RegisterError> {
let mut tx_map = self.tx_map.lock().unwrap();
if tx_map.get(&receiver_id).is_some() {
return Err(RegisterError::AlreadyRegistered(receiver_id));
}
let (tx, rx) = unbounded_channel();
tx_map.insert(receiver_id, tx);
Ok(rx)
}
/// Unregisters a receiver with the given ID, if it exists.
/// This closes the associated channel.
///
/// Returns true if a receiver was unregistered, returns false if the provided ID
/// was not associated with a registered receiver.
pub fn unregister_receiver(&self, receiver_id: &str) -> bool {
self.tx_map.lock().unwrap().remove(receiver_id).is_some()
}
/// Unregisters all receivers, returning the number of receivers that were unregistered.
/// This closes all associated channels.
pub fn unregister_all(&self) -> usize {
self.tx_map.lock().unwrap().drain().count()
}
/// Dispatches a message to the receiver associated with the provided ID.
pub fn dispatch(&self, receiver_id: &str, message: T) -> Result<(), DispatchError<T>> {
if let Some(tx) = self.tx_map.lock().unwrap().get(receiver_id) {
Ok(tx.send(message)?)
} else {
Err(DispatchError::NotFound(receiver_id.to_string()))
}
}
}
}