resctl-bench/src/bench/iocost_tune/merge.rs (215 lines of code) (raw):
use super::super::*;
use super::{DataSel, DataSeries, IoCostTuneBench, IoCostTuneRecord, IoCostTuneResult};
use statrs::distribution::{ContinuousCDF, Normal};
use std::collections::{BTreeMap, HashMap, HashSet};
fn model_to_array(model: &IoCostModelParams) -> [f64; 6] {
[
model.rbps as f64,
model.rseqiops as f64,
model.rrandiops as f64,
model.wbps as f64,
model.wseqiops as f64,
model.wrandiops as f64,
]
}
fn model_from_array(array: &[f64]) -> IoCostModelParams {
IoCostModelParams {
rbps: array[0].round() as u64,
rseqiops: array[1].round() as u64,
rrandiops: array[2].round() as u64,
wbps: array[3].round() as u64,
wseqiops: array[4].round() as u64,
wrandiops: array[5].round() as u64,
}
}
fn merge_model(
models: HashSet<IoCostModelParams>,
) -> (IoCostModelParams, HashMap<IoCostModelParams, bool>) {
// The bool indicates whether an outlier.
let mut models: Vec<(IoCostModelParams, bool)> =
models.into_iter().map(|model| (model, false)).collect();
// Convert to arrays of f64's.
let mut param_sets: [Vec<f64>; 6] = Default::default();
for model in models.iter() {
for (i, v) in model_to_array(&model.0).iter().enumerate() {
param_sets[i].push(*v);
}
}
// Filter out outliers if there are more than three models.
if models.len() > 3 {
let means: Vec<f64> = param_sets
.iter()
.map(|set| statistical::mean(set))
.collect();
let stdevs: Vec<f64> = param_sets
.iter()
.map(|set| statistical::standard_deviation(set, None))
.collect();
trace!("merge_model: means={:?} stdevs={:?}", &means, &stdevs);
// Apply Chauvenet's criterion on each model parameter to detect and
// reject outliers. We reject models with any parameter determined to be
// an outlier.
for (pi, (&mean, &stdev)) in means.iter().zip(stdevs.iter()).enumerate() {
if let Ok(dist) = Normal::new(mean, stdev) {
for (mi, &val) in param_sets[pi].iter().enumerate() {
let is_outlier = (1.0 - dist.cdf(val)) * (models.len() as f64) < 0.5;
trace!(
"merge_model: pi={} mean={} stdev={} mi={} val={} is_outlier={}",
pi,
mean,
stdev,
mi,
val,
is_outlier
);
models[mi].1 |= is_outlier;
}
}
}
}
let model_is_outlier: HashMap<IoCostModelParams, bool> = models.into_iter().collect();
// Determine the median model parameters.
let mut filtered_sets: [Vec<f64>; 6] = Default::default();
for (model, outlier) in model_is_outlier.iter() {
if !outlier {
for (i, v) in model_to_array(model).iter().enumerate() {
filtered_sets[i].push(*v);
}
}
}
for set in filtered_sets.iter_mut() {
set.sort_by(|a, b| a.partial_cmp(b).unwrap());
}
let medians: Vec<f64> = filtered_sets.iter().map(|set| set[set.len() / 2]).collect();
(model_from_array(&medians), model_is_outlier)
}
pub fn merge(srcs: &mut Vec<MergeSrc>) -> Result<JobData> {
// We only care about distinct models. Weed out duplicates using HashSet.
let models: HashSet<IoCostModelParams> = srcs
.iter()
.map(|src| src.data.sysinfo.iocost.model.knobs.clone())
.collect();
let (median_model, model_is_outlier) = merge_model(models);
// Mark outlier sources.
for src in srcs.iter_mut().filter(|src| src.rejected.is_none()) {
if model_is_outlier[&src.data.sysinfo.iocost.model.knobs] {
src.rejected = Some("model is an outlier".to_string());
}
}
let mut first_valid = None;
let mut data = BTreeMap::<DataSel, DataSeries>::default();
for src in srcs.iter_mut().filter(|src| src.rejected.is_none()) {
let (rec, res): (IoCostTuneRecord, IoCostTuneResult) =
match (src.data.parse_record(), src.data.parse_result()) {
(Ok(rec), Ok(res)) => (rec, res),
(Err(e), _) | (_, Err(e)) => {
src.rejected = Some(format!("failed to parse ({:?})", &e));
debug!(
"iocost-tune-merge: {:?} rejected ({})",
&src.file,
src.rejected.as_ref().unwrap()
);
continue;
}
};
match first_valid.as_ref() {
None => first_valid = Some((rec.clone(), res.clone())),
Some((_, fres)) => {
assert_eq!(
(fres.mem_profile, &fres.isol_pct, fres.isol_thr),
(res.mem_profile, &res.isol_pct, res.isol_thr)
);
}
}
for (sel, mut src_ds) in res.data.into_iter() {
let dst_ds = match data.get_mut(&sel) {
Some(ds) => ds,
None => {
data.insert(sel.clone(), DataSeries::default());
data.get_mut(&sel).unwrap()
}
};
dst_ds.points.append(&mut src_ds.points);
dst_ds.outliers.append(&mut src_ds.outliers);
}
}
if first_valid.is_none() {
bail!("No valid result to merge");
}
let (first_rec, first_res) = first_valid.unwrap();
let (rec, res) = (
first_rec,
IoCostTuneResult {
data,
solutions: Default::default(),
..first_res
},
);
let dfl_spec = JobSpec::new("iocost-tune", None, None, JobSpec::props(&vec![]));
let job = IoCostTuneBench {}.parse(&dfl_spec, None)?;
let rec_json = serde_json::to_value(rec)?;
let res_json = job.solve(rec_json.clone(), serde_json::to_value(res)?)?;
let mut job_data = JobData {
spec: dfl_spec,
period: merged_period(&srcs),
sysinfo: merged_sysinfo(&srcs).unwrap(),
record: Some(rec_json),
result: Some(res_json),
};
job_data.sysinfo.iocost.model.knobs = median_model;
Ok(job_data)
}
#[cfg(test)]
mod tests {
use super::IoCostModelParams;
use std::collections::HashSet;
#[test]
fn test_iocost_tune_model_merge() {
let _ = ::env_logger::try_init();
let srcs: HashSet<IoCostModelParams> = vec![
IoCostModelParams {
rbps: 125 << 20,
rseqiops: 280,
rrandiops: 280,
wbps: 125 << 20,
wseqiops: 280,
wrandiops: 280,
},
IoCostModelParams {
rbps: 122 << 20,
rseqiops: 270,
rrandiops: 269,
wbps: 126 << 20,
wseqiops: 284,
wrandiops: 282,
},
IoCostModelParams {
rbps: 127 << 20,
rseqiops: 288,
rrandiops: 289,
wbps: 122 << 20,
wseqiops: 270,
wrandiops: 260,
},
IoCostModelParams {
rbps: 160 << 20,
rseqiops: 288,
rrandiops: 289,
wbps: 122 << 20,
wseqiops: 300,
wrandiops: 260,
},
]
.into_iter()
.collect();
let (median_model, model_is_outlier) = super::merge_model(srcs);
assert_eq!(
median_model,
IoCostModelParams {
rbps: 125 << 20,
rseqiops: 280,
rrandiops: 280,
wbps: 125 << 20,
wseqiops: 280,
wrandiops: 280,
}
);
assert_eq!(
model_is_outlier
.iter()
.fold(0, |acc, (_k, &v)| if v { acc + 1 } else { acc }),
1
);
}
}