proxy_agent/src/shared_state/telemetry_wrapper.rs (78 lines of code) (raw):

// Copyright (c) Microsoft Corporation // SPDX-License-Identifier: MIT //! This module contains the logic to interact with the telemetry module. //! Example //! ```rust //! use proxy_agent::shared_state::telemetry_wrapper::TelemetrySharedState; //! use proxy_agent::telemetry::event_reader::VmMetaData; //! //! let telemetry_shared_state = TelemetrySharedState::start_new(); //! let vm_meta_data = VmMetaData::new("vm_id".to_string(), "vm_name".to_string()); //! telemetry_shared_state.set_vm_meta_data(Some(vm_meta_data.clone())).await.unwrap(); //! let meta_data = telemetry_shared_state.get_vm_meta_data().await.unwrap().unwrap(); //! assert_eq!(meta_data, vm_meta_data); //! ``` use crate::common::result::Result; use crate::common::{error::Error, logger}; use crate::telemetry::event_reader::VmMetaData; use tokio::sync::{mpsc, oneshot}; enum TelemetryAction { SetVmMetaData { vm_meta_data: Option<VmMetaData>, response: oneshot::Sender<()>, }, GetVmMetaData { response: oneshot::Sender<Option<VmMetaData>>, }, } #[derive(Clone, Debug)] pub struct TelemetrySharedState(mpsc::Sender<TelemetryAction>); impl TelemetrySharedState { pub fn start_new() -> Self { let (sender, mut receiver) = mpsc::channel(100); tokio::spawn(async move { let mut vm_meta_data: Option<VmMetaData> = None; loop { match receiver.recv().await { Some(TelemetryAction::SetVmMetaData { vm_meta_data: meta_data, response, }) => { vm_meta_data = meta_data.clone(); if response.send(()).is_err() { logger::write_warning(format!( "Failed to send response to TelemetryAction::SetVmMetaData '{:?}'", meta_data, )); } } Some(TelemetryAction::GetVmMetaData { response }) => { if let Err(meta_data) = response.send(vm_meta_data.clone()) { logger::write_warning(format!( "Failed to send response to TelemetryAction::GetVmMetaData '{:?}'", meta_data, )); } } None => { break; } } } }); Self(sender) } pub async fn set_vm_meta_data(&self, vm_meta_data: Option<VmMetaData>) -> Result<()> { let (response, receiver) = oneshot::channel(); self.0 .send(TelemetryAction::SetVmMetaData { vm_meta_data, response, }) .await .map_err(|e| { Error::SendError("TelemetryAction::SetVmMetaData".to_string(), e.to_string()) })?; receiver .await .map_err(|e| Error::RecvError("TelemetryAction::SetVmMetaData".to_string(), e)) } pub async fn get_vm_meta_data(&self) -> Result<Option<VmMetaData>> { let (response, receiver) = oneshot::channel(); self.0 .send(TelemetryAction::GetVmMetaData { response }) .await .map_err(|e| { Error::SendError("TelemetryAction::GetVmMetaData".to_string(), e.to_string()) })?; receiver .await .map_err(|e| Error::RecvError("TelemetryAction::GetVmMetaData".to_string(), e)) } }