resctl-bench/src/bench/iocost_tune/merge.rs (215 lines of code) (raw):

use super::super::*; use super::{DataSel, DataSeries, IoCostTuneBench, IoCostTuneRecord, IoCostTuneResult}; use statrs::distribution::{ContinuousCDF, Normal}; use std::collections::{BTreeMap, HashMap, HashSet}; fn model_to_array(model: &IoCostModelParams) -> [f64; 6] { [ model.rbps as f64, model.rseqiops as f64, model.rrandiops as f64, model.wbps as f64, model.wseqiops as f64, model.wrandiops as f64, ] } fn model_from_array(array: &[f64]) -> IoCostModelParams { IoCostModelParams { rbps: array[0].round() as u64, rseqiops: array[1].round() as u64, rrandiops: array[2].round() as u64, wbps: array[3].round() as u64, wseqiops: array[4].round() as u64, wrandiops: array[5].round() as u64, } } fn merge_model( models: HashSet<IoCostModelParams>, ) -> (IoCostModelParams, HashMap<IoCostModelParams, bool>) { // The bool indicates whether an outlier. let mut models: Vec<(IoCostModelParams, bool)> = models.into_iter().map(|model| (model, false)).collect(); // Convert to arrays of f64's. let mut param_sets: [Vec<f64>; 6] = Default::default(); for model in models.iter() { for (i, v) in model_to_array(&model.0).iter().enumerate() { param_sets[i].push(*v); } } // Filter out outliers if there are more than three models. if models.len() > 3 { let means: Vec<f64> = param_sets .iter() .map(|set| statistical::mean(set)) .collect(); let stdevs: Vec<f64> = param_sets .iter() .map(|set| statistical::standard_deviation(set, None)) .collect(); trace!("merge_model: means={:?} stdevs={:?}", &means, &stdevs); // Apply Chauvenet's criterion on each model parameter to detect and // reject outliers. We reject models with any parameter determined to be // an outlier. for (pi, (&mean, &stdev)) in means.iter().zip(stdevs.iter()).enumerate() { if let Ok(dist) = Normal::new(mean, stdev) { for (mi, &val) in param_sets[pi].iter().enumerate() { let is_outlier = (1.0 - dist.cdf(val)) * (models.len() as f64) < 0.5; trace!( "merge_model: pi={} mean={} stdev={} mi={} val={} is_outlier={}", pi, mean, stdev, mi, val, is_outlier ); models[mi].1 |= is_outlier; } } } } let model_is_outlier: HashMap<IoCostModelParams, bool> = models.into_iter().collect(); // Determine the median model parameters. let mut filtered_sets: [Vec<f64>; 6] = Default::default(); for (model, outlier) in model_is_outlier.iter() { if !outlier { for (i, v) in model_to_array(model).iter().enumerate() { filtered_sets[i].push(*v); } } } for set in filtered_sets.iter_mut() { set.sort_by(|a, b| a.partial_cmp(b).unwrap()); } let medians: Vec<f64> = filtered_sets.iter().map(|set| set[set.len() / 2]).collect(); (model_from_array(&medians), model_is_outlier) } pub fn merge(srcs: &mut Vec<MergeSrc>) -> Result<JobData> { // We only care about distinct models. Weed out duplicates using HashSet. let models: HashSet<IoCostModelParams> = srcs .iter() .map(|src| src.data.sysinfo.iocost.model.knobs.clone()) .collect(); let (median_model, model_is_outlier) = merge_model(models); // Mark outlier sources. for src in srcs.iter_mut().filter(|src| src.rejected.is_none()) { if model_is_outlier[&src.data.sysinfo.iocost.model.knobs] { src.rejected = Some("model is an outlier".to_string()); } } let mut first_valid = None; let mut data = BTreeMap::<DataSel, DataSeries>::default(); for src in srcs.iter_mut().filter(|src| src.rejected.is_none()) { let (rec, res): (IoCostTuneRecord, IoCostTuneResult) = match (src.data.parse_record(), src.data.parse_result()) { (Ok(rec), Ok(res)) => (rec, res), (Err(e), _) | (_, Err(e)) => { src.rejected = Some(format!("failed to parse ({:?})", &e)); debug!( "iocost-tune-merge: {:?} rejected ({})", &src.file, src.rejected.as_ref().unwrap() ); continue; } }; match first_valid.as_ref() { None => first_valid = Some((rec.clone(), res.clone())), Some((_, fres)) => { assert_eq!( (fres.mem_profile, &fres.isol_pct, fres.isol_thr), (res.mem_profile, &res.isol_pct, res.isol_thr) ); } } for (sel, mut src_ds) in res.data.into_iter() { let dst_ds = match data.get_mut(&sel) { Some(ds) => ds, None => { data.insert(sel.clone(), DataSeries::default()); data.get_mut(&sel).unwrap() } }; dst_ds.points.append(&mut src_ds.points); dst_ds.outliers.append(&mut src_ds.outliers); } } if first_valid.is_none() { bail!("No valid result to merge"); } let (first_rec, first_res) = first_valid.unwrap(); let (rec, res) = ( first_rec, IoCostTuneResult { data, solutions: Default::default(), ..first_res }, ); let dfl_spec = JobSpec::new("iocost-tune", None, None, JobSpec::props(&vec![])); let job = IoCostTuneBench {}.parse(&dfl_spec, None)?; let rec_json = serde_json::to_value(rec)?; let res_json = job.solve(rec_json.clone(), serde_json::to_value(res)?)?; let mut job_data = JobData { spec: dfl_spec, period: merged_period(&srcs), sysinfo: merged_sysinfo(&srcs).unwrap(), record: Some(rec_json), result: Some(res_json), }; job_data.sysinfo.iocost.model.knobs = median_model; Ok(job_data) } #[cfg(test)] mod tests { use super::IoCostModelParams; use std::collections::HashSet; #[test] fn test_iocost_tune_model_merge() { let _ = ::env_logger::try_init(); let srcs: HashSet<IoCostModelParams> = vec![ IoCostModelParams { rbps: 125 << 20, rseqiops: 280, rrandiops: 280, wbps: 125 << 20, wseqiops: 280, wrandiops: 280, }, IoCostModelParams { rbps: 122 << 20, rseqiops: 270, rrandiops: 269, wbps: 126 << 20, wseqiops: 284, wrandiops: 282, }, IoCostModelParams { rbps: 127 << 20, rseqiops: 288, rrandiops: 289, wbps: 122 << 20, wseqiops: 270, wrandiops: 260, }, IoCostModelParams { rbps: 160 << 20, rseqiops: 288, rrandiops: 289, wbps: 122 << 20, wseqiops: 300, wrandiops: 260, }, ] .into_iter() .collect(); let (median_model, model_is_outlier) = super::merge_model(srcs); assert_eq!( median_model, IoCostModelParams { rbps: 125 << 20, rseqiops: 280, rrandiops: 280, wbps: 125 << 20, wseqiops: 280, wrandiops: 280, } ); assert_eq!( model_is_outlier .iter() .fold(0, |acc, (_k, &v)| if v { acc + 1 } else { acc }), 1 ); } }