shed/scuba_sample/src/builder.rs (196 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under both the MIT license found in the * LICENSE-MIT file in the root directory of this source tree and the Apache * License, Version 2.0 found in the LICENSE-APACHE file in the root directory * of this source tree. */ //! See the [ScubaSampleBuilder] documentation use fbinit::FacebookInit; use serde_json::{Error, Value}; use std::collections::hash_map::Entry; use std::fmt; use std::fs::{File, OpenOptions}; use std::io::{Error as IoError, Write}; use std::num::NonZeroU64; use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use crate::sample::ScubaSample; use crate::value::ScubaValue; use crate::Sampling; /// A helper builder to make it easier to create a new sample and log it into /// the proper Scuba dataset. #[derive(Clone)] pub struct ScubaSampleBuilder { sample: ScubaSample, log_file: Option<Arc<Mutex<File>>>, sampling: Sampling, seq: Option<Arc<(String, AtomicU64)>>, } impl ScubaSampleBuilder { /// Create a new instance of the Builder with initially an empty sample /// that will preserve the sample in the provided dataset. The arguments /// are used only in fbcode builds. pub fn new<T: Into<String>>(_fb: FacebookInit, _dataset: T) -> Self { Self::with_discard() } /// Create a new instance of the Builder with initially an empty sample /// that will discard the sample instead of writing it to a Scuba dataset. pub fn with_discard() -> Self { Self { sample: ScubaSample::new(), log_file: None, sampling: Sampling::NoSampling, seq: None, } } /// Create a new instance of the Builder with initially an empty sample /// that will preserve the sample in the provided log file. pub fn with_log_file<L: AsRef<Path>>(mut self, log_file: L) -> Result<Self, IoError> { let log_file = OpenOptions::new() .create(true) .append(true) .open(log_file)?; self.log_file = Some(Arc::new(Mutex::new(log_file))); Ok(self) } /// Enable log sequencing. Each sample from this builder (or its clones) /// will get a monotonically incrementing sequence number logged in the /// named field with each log. pub fn with_seq(mut self, key: impl Into<String>) -> Self { self.seq = Some(Arc::new((key.into(), AtomicU64::new(0)))); self } /// Return true if a client is not set for this builder. This method will /// return false even if a log file is provided and the sample will be /// preserved in it. pub fn is_discard(&self) -> bool { true } /// Call the internal sample's [super::sample::ScubaSample::add] method pub fn add<K: Into<String>, V: Into<ScubaValue>>(&mut self, key: K, value: V) -> &mut Self { self.sample.add(key, value); self } /// Call the internal sample's [super::sample::ScubaSample::add] method /// if the specified value is not `None`. pub fn add_opt<K: Into<String>, V: Into<ScubaValue>>( &mut self, key: K, value: Option<V>, ) -> &mut Self { self.sample.add_opt(key, value); self } /// Call the internal sample's [super::sample::ScubaSample::remove] method pub fn remove<K: Into<String>>(&mut self, key: K) -> &mut Self { self.sample.remove(key); self } /// Call the internal sample's [super::sample::ScubaSample::get] method pub fn get<K: Into<String>>(&self, key: K) -> Option<&ScubaValue> { self.sample.get(key) } /// Call the internal sample's [super::sample::ScubaSample::entry] method pub fn entry<K: Into<String>>(&mut self, key: K) -> Entry<String, ScubaValue> { self.sample.entry(key) } /// Only log one in sample_rate samples. The decision is made at the point where sampled() is /// called. Multiple calls to sampled() further reduce the logging probability. pub fn sampled(&mut self, sample_rate: NonZeroU64) -> &mut Self { self.sampling = self.sampling.sample(&mut rand::thread_rng(), sample_rate); self } /// Revert sampling. pub fn unsampled(&mut self) -> &mut Self { self.sampling = Sampling::NoSampling; self } /// Access this builder's underlying [Sampling]. pub fn sampling(&self) -> &Sampling { &self.sampling } /// Get a reference to the internally built sample. pub fn get_sample(&self) -> &ScubaSample { &self.sample } /// Get a mutable reference to the internally built sample. pub fn get_sample_mut(&mut self) -> &mut ScubaSample { &mut self.sample } /// Set the [subset] of this sample. /// /// [subset]: https://fburl.com/qa/xqm9hsxx pub fn set_subset<S: Into<String>>(&mut self, subset: S) -> &mut Self { self.sample.set_subset(subset); self } /// Clear the [subset] of this sample. /// /// [subset]: https://fburl.com/qa/xqm9hsxx pub fn clear_subset(&mut self) -> &mut Self { self.sample.clear_subset(); self } /// Update the sequence number in preparation for a new log operation. fn next_seq(&mut self) { if let Some((key, seq)) = self.seq.as_deref() { let next_seq = seq.fetch_add(1, Ordering::Relaxed); self.sample.add(key, next_seq); } } /// Log the internally built sample to the previously configured log file while overriding its /// timestamp to the current time. Returns whether the sample passed sampling. pub fn log(&mut self) -> bool { self.sample.set_time_now(); self.next_seq(); if !self.sampling.apply(&mut self.sample) { return false; } if let Some(ref log_file) = self.log_file { if let Ok(sample) = self.to_json() { let mut log_file = log_file.lock().expect("Poisoned lock"); let _ = log_file.write_all(sample.to_string().as_bytes()); let _ = log_file.write_all(b"\n"); } } true } /// Log the internally built sample to the previously configured log file while overriding its /// timestamp to the provided time. Returns whether the sample passed sampling. pub fn log_with_time(&mut self, time: u64) -> bool { self.sample.set_time(time); self.next_seq(); if !self.sampling.apply(&mut self.sample) { return true; } if let Some(ref log_file) = self.log_file { if let Ok(sample) = self.sample.to_json() { let mut log_file = log_file.lock().expect("Poisoned lock"); let _ = log_file.write_all(sample.to_string().as_bytes()); let _ = log_file.write_all(b"\n"); } } true } /// Either flush the configured client with the provided timeout or flush /// the configured log file making sure all the logged samples have been /// written to it. The timeout is used only in fbcode builds. pub fn flush(&self, _timeout: Duration) { if let Some(ref log_file) = self.log_file { let mut log_file = log_file.lock().expect("Poisoned lock"); let _ = log_file.flush(); } } /// Return a json serialized sample pub fn to_json(&self) -> Result<Value, Error> { self.sample.to_json() } /// Add values to the sample that are widely used in Facebook services. For /// non-fbcode-builds it does nothing. The provided mapper function is used /// to transform the valuse before they are written to the sample. pub fn add_mapped_common_server_data<F>(&mut self, _mapper: F) -> &mut Self where F: Fn(ServerData) -> &'static str, { self } /// Add values to the sample that are widely used in Facebook services. For /// non-fbcode-builds it does nothing. pub fn add_common_server_data(&mut self) -> &mut Self { self.add_mapped_common_server_data(|data| data.default_key()) } /// Call the internal sample's [super::sample::ScubaSample::join_values] method pub fn join_values(&mut self, sample: &ScubaSample) { self.sample.join_values(sample) } } impl fmt::Debug for ScubaSampleBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "ScubaSampleBuilder {{ sample: {:?} }}", self.sample) } } /// Enum representing commonly used server data written to the Scuba sample. pub enum ServerData { /// Hostname of the server Hostname, /// Tier of the service Tier, /// Tupperware TaskId of the service TaskId, /// Tupperware CanaryId of the service CanaryId, /// Tupperware JobHandle of the service JobHandle, /// Build revision of the current binary BuildRevision, /// Build rule of the current binary BuildRule, /// Chronos cluster ScheduledJobCluster, /// Chronos job instance id ScheduledJobInstanceId, /// Chronos job name ScheduledJobName, } impl ServerData { /// Return a unique key for the server data under which the value will be /// stored in the sample. Pay attention not to use the same keys if you don't /// wish to override those values. pub fn default_key(&self) -> &'static str { match self { ServerData::Hostname => "server_hostname", ServerData::Tier => "server_tier", ServerData::TaskId => "tw_task_id", ServerData::CanaryId => "tw_canary_id", ServerData::JobHandle => "tw_handle", ServerData::BuildRevision => "build_revision", ServerData::BuildRule => "build_rule", ServerData::ScheduledJobCluster => "chronos_cluster", ServerData::ScheduledJobInstanceId => "chronos_job_instance_id", ServerData::ScheduledJobName => "chronos_job_name", } } }