src/metrics/metricer.rs (96 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //! Metricer methods. use super::meter::{MeterId, Transform}; use crate::reporter::{CollectItem, DynReport, Report}; use std::{ collections::HashMap, future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration, }; use tokio::{ select, spawn, sync::mpsc, task::{spawn_blocking, JoinError, JoinHandle}, time::interval, }; /// Metricer handles skywalking metrics operations, integrate with reporter, can /// be register with multiple [Transform]. pub struct Metricer { service_name: String, instance_name: String, reporter: Box<DynReport>, meter_map: HashMap<MeterId, Arc<dyn Transform>>, report_interval: Duration, } impl Metricer { /// New with service info and reporter. pub fn new( service_name: impl Into<String>, instance_name: impl Into<String>, reporter: impl Report + Send + Sync + 'static, ) -> Self { Self { service_name: service_name.into(), instance_name: instance_name.into(), reporter: Box::new(reporter), meter_map: Default::default(), report_interval: Duration::from_secs(20), } } /// Get service name. pub fn service_name(&self) -> &str { &self.service_name } /// Get instance name. pub fn instance_name(&self) -> &str { &self.instance_name } /// Set report interval, default is 20s. pub fn set_report_interval(&mut self, report_interval: Duration) { self.report_interval = report_interval; } /// Register new [Transform], and return it with [Arc] wrap. pub fn register<T: Transform + 'static>(&mut self, transform: T) -> Arc<T> { let transform = Arc::new(transform); self.meter_map .insert(transform.meter_id(), transform.clone()); transform } /// Boot the reporting with the report interval previous set, will be run in /// background. pub fn boot(self) -> Booting { let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); let handle = spawn(async move { let mut ticker = interval(self.report_interval); let metricer = Arc::new(self); loop { let metricer_ = metricer.clone(); let _ = spawn_blocking(move || { for trans in metricer_.meter_map.values() { metricer_ .reporter .report(CollectItem::Meter(Box::new(trans.transform(&metricer_)))); } }) .await; select! { _ = ticker.tick() => {} _ = shutdown_rx.recv() => { return; } } } }); Booting { handle, shutdown_tx, } } } /// handle of [Metricer::boot]. pub struct Booting { handle: JoinHandle<()>, shutdown_tx: mpsc::Sender<()>, } impl Booting { /// Shutdown the metrics reporting. pub async fn shutdown(self) -> crate::Result<()> { self.shutdown_tx.send(()).await.unwrap(); Ok(self.await?) } } impl Future for Booting { type Output = Result<(), JoinError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Pin::new(&mut self.handle).poll(cx) } }