rd-agent/src/report.rs (858 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. use anyhow::{anyhow, bail, Result}; use chrono::prelude::*; use crossbeam::channel::{self, select, Receiver, Sender}; use enum_iterator::IntoEnumIterator; use log::{debug, error, info, trace, warn}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use scan_fmt::scan_fmt; use std::collections::{BTreeMap, HashMap}; use std::ffi::OsStr; use std::fs; use std::io::prelude::*; use std::io::BufReader; use std::os::unix::fs::symlink; use std::panic; use std::process::{Child, Command, Stdio}; use std::thread::{spawn, JoinHandle}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use super::cmd::Runner; use super::Config; use rd_agent_intf::{ report::StatMap, BenchHashdReport, BenchIoCostReport, HashdReport, IoCostReport, IoLatReport, Report, ResCtlReport, Slice, UsageReport, ROOT_SLICE, }; use rd_util::*; #[derive(Debug, Default)] struct Usage { cpu_busy: f64, cpu_sys: f64, mem_bytes: u64, swap_bytes: u64, swap_free: u64, io_rbytes: u64, io_wbytes: u64, io_usage: u64, cpu_stalls: (f64, f64), mem_stalls: (f64, f64), io_stalls: (f64, f64), mem_stat: StatMap, io_stat: StatMap, } fn read_stalls(path: &str) -> Result<(f64, f64)> { let f = fs::OpenOptions::new().read(true).open(path)?; let r = BufReader::new(f); let (mut some, mut full) = (None, None); for line in r.lines().filter_map(|x| x.ok()) { if let Ok((which, v)) = scan_fmt!( &line, "{} avg10={*f} avg60={*f} avg300={*f} total={d}", String, u64 ) { match (which.as_ref(), v) { ("some", v) => some = Some(v as f64 / 1_000_000.0), ("full", v) => full = Some(v as f64 / 1_000_000.0), _ => {} } } } Ok((some.unwrap_or(0.0), full.unwrap_or(0.0))) } fn read_stat_file(path: &str) -> Result<StatMap> { let map = read_cgroup_flat_keyed_file(path)?; Ok(map.iter().map(|(k, v)| (k.clone(), *v as f64)).collect()) } fn read_system_usage(devnr: (u32, u32)) -> Result<(Usage, f64)> { let kstat = procfs::KernelStats::new()?; let cpu = &kstat.total; let mut cpu_total = cpu.user as f64 + cpu.nice as f64 + cpu.system as f64 + cpu.idle as f64 + cpu.iowait.unwrap() as f64 + cpu.irq.unwrap() as f64 + cpu.softirq.unwrap() as f64 + cpu.steal.unwrap() as f64 + cpu.guest.unwrap() as f64 + cpu.guest_nice.unwrap() as f64; let mut cpu_busy = cpu_total - cpu.idle as f64 - cpu.iowait.unwrap() as f64; let tps = procfs::ticks_per_second()? as f64; let cpu_sys = cpu.system as f64 / tps; cpu_busy /= tps; cpu_total /= tps; let mstat = procfs::Meminfo::new()?; let mem_bytes = mstat.mem_total - mstat.mem_free; let swap_bytes = mstat.swap_total - mstat.swap_free; let mut io_rbytes = 0; let mut io_wbytes = 0; for dstat in linux_proc::diskstats::DiskStats::from_system()?.iter() { if dstat.major == devnr.0 as u64 && dstat.minor == devnr.1 as u64 { io_rbytes = dstat.sectors_read * 512; io_wbytes = dstat.sectors_written * 512; } } let mem_stat_path = "/sys/fs/cgroup/memory.stat"; let mem_stat = match read_stat_file(&mem_stat_path) { Ok(v) => v, Err(e) => { debug!("report: Failed to read {} ({:?})", &mem_stat_path, &e); Default::default() } }; let mut io_usage = 0; let mut io_stat = Default::default(); if let Ok(mut is) = read_cgroup_nested_keyed_file("/sys/fs/cgroup/io.stat") { if let Some(is) = is.remove(&format!("{}:{}", devnr.0, devnr.1)) { if let Some(val) = is.get("cost.usage") { io_usage = scan_fmt!(&val, "{}", u64).unwrap_or(0); } io_stat = is .into_iter() .map(|(k, v)| (k, v.parse::<f64>().unwrap_or(0.0))) .collect(); } } Ok(( Usage { cpu_busy, cpu_sys, mem_bytes, swap_bytes, swap_free: mstat.swap_free, io_rbytes, io_wbytes, io_usage, mem_stat, io_stat, cpu_stalls: read_stalls("/proc/pressure/cpu")?, mem_stalls: read_stalls("/proc/pressure/memory")?, io_stalls: read_stalls("/proc/pressure/io")?, }, cpu_total, )) } fn read_swap_free(cgrp: &str) -> Result<u64> { if !cgrp.starts_with("/sys/fs/cgroup/") { bail!("cgroup path doesn't start with /sys/fs/cgroup"); } // Walk up the hierarchy and take the min. We should expose this in // memory.stat from kernel side eventually. let mut free = procfs::Meminfo::new()?.swap_free; let mut path = std::path::PathBuf::from(cgrp); while path != std::path::Path::new("/sys/fs/cgroup") { path.push("memory.swap.max"); let max = match read_one_line(path.to_str().unwrap()) .unwrap_or("max".to_owned()) .as_str() { "max" => std::u64::MAX, line => scan_fmt!(line, "{}", u64)?, }; path.pop(); path.push("memory.swap.current"); let cur = scan_fmt!( &read_one_line(path.to_str().unwrap()).unwrap_or("0".to_owned()), "{}", u64 )?; free = free.min(max.saturating_sub(cur)); path.pop(); path.pop(); } Ok(free) } fn read_cgroup_usage(cgrp: &str, devnr: (u32, u32)) -> Usage { let mut usage: Usage = Default::default(); if let Ok(cs) = read_cgroup_flat_keyed_file(&(cgrp.to_string() + "/cpu.stat")) { if let Some(v) = cs.get("usage_usec") { usage.cpu_busy = *v as f64 / 1_000_000.0; } if let Some(v) = cs.get("system_usec") { usage.cpu_sys = *v as f64 / 1_000_000.0; } } if let Ok(line) = read_one_line(&(cgrp.to_string() + "/memory.current")) { if let Ok(v) = scan_fmt!(&line, "{}", u64) { usage.mem_bytes = v; } } if let Ok(line) = read_one_line(&(cgrp.to_string() + "/memory.swap.current")) { if let Ok(v) = scan_fmt!(&line, "{}", u64) { usage.swap_bytes = v; } } if let Ok(v) = read_swap_free(cgrp) { usage.swap_free = v; } let mem_stat_path = cgrp.to_string() + "/memory.stat"; usage.mem_stat = match read_stat_file(&mem_stat_path) { Ok(v) => v, Err(e) => { debug!("report: Failed to read {} ({:?})", &mem_stat_path, &e); Default::default() } }; if let Ok(mut is) = read_cgroup_nested_keyed_file(&(cgrp.to_string() + "/io.stat")) { if let Some(is) = is.remove(&format!("{}:{}", devnr.0, devnr.1)) { if let Some(val) = is.get("rbytes") { usage.io_rbytes = scan_fmt!(&val, "{}", u64).unwrap_or(0); } if let Some(val) = is.get("wbytes") { usage.io_wbytes = scan_fmt!(&val, "{}", u64).unwrap_or(0); } if let Some(val) = is.get("cost.usage") { usage.io_usage = scan_fmt!(&val, "{}", u64).unwrap_or(0); } usage.io_stat = is .into_iter() .map(|(k, v)| (k, v.parse::<f64>().unwrap_or(0.0))) .collect(); } } if let Ok(v) = read_stalls(&(cgrp.to_string() + "/cpu.pressure")) { usage.cpu_stalls = v; } if let Ok(v) = read_stalls(&(cgrp.to_string() + "/memory.pressure")) { usage.mem_stalls = v; } if let Ok(v) = read_stalls(&(cgrp.to_string() + "/io.pressure")) { usage.io_stalls = v; } usage } pub struct UsageTracker { devnr: (u32, u32), at: Instant, cpu_total: f64, usages: HashMap<String, Usage>, runner: Runner, } impl UsageTracker { fn new(devnr: (u32, u32), runner: Runner) -> Self { let mut us = Self { devnr, at: Instant::now(), cpu_total: 0.0, usages: HashMap::new(), runner, }; us.usages.insert(ROOT_SLICE.into(), Default::default()); for slice in Slice::into_enum_iter() { us.usages.insert(slice.name().into(), Default::default()); } if let Err(e) = us.update() { warn!("report: Failed to update usages ({:?})", &e); } us } fn read_usages(&self) -> Result<(HashMap<String, Usage>, f64)> { let mut usages = HashMap::new(); let (us, cpu_total) = read_system_usage(self.devnr)?; usages.insert(ROOT_SLICE.into(), us); for slice in Slice::into_enum_iter() { usages.insert( slice.name().to_string(), read_cgroup_usage(slice.cgrp(), self.devnr), ); } let all_svcs = self.runner.data.lock().unwrap().all_svcs(); for (svc, cgrp) in all_svcs.into_iter() { usages.insert(svc, read_cgroup_usage(&cgrp, self.devnr)); } Ok((usages, cpu_total)) } fn update(&mut self) -> Result<BTreeMap<String, UsageReport>> { let mut reps = BTreeMap::new(); let now = Instant::now(); let (usages, cpu_total) = self.read_usages()?; let dur = now.duration_since(self.at).as_secs_f64(); let zero_usage = Usage::default(); for (unit, cur) in usages.iter() { let mut rep: UsageReport = Default::default(); let last = self.usages.get(unit).unwrap_or(&zero_usage); let cpu_total_delta = cpu_total - self.cpu_total; if cpu_total_delta > 0.0 { rep.cpu_util = ((cur.cpu_busy - last.cpu_busy) / cpu_total_delta) .min(1.0) .max(0.0); rep.cpu_sys = ((cur.cpu_sys - last.cpu_sys) / cpu_total_delta) .min(1.0) .max(0.0); } rep.cpu_usage = cur.cpu_busy; rep.cpu_usage_sys = cur.cpu_sys; rep.cpu_usage_base = cpu_total; rep.mem_bytes = cur.mem_bytes; rep.swap_bytes = cur.swap_bytes; rep.swap_free = cur.swap_free; rep.io_rbytes = cur.io_rbytes; rep.io_wbytes = cur.io_wbytes; if dur > 0.0 { if cur.io_rbytes >= last.io_rbytes { rep.io_rbps = ((cur.io_rbytes - last.io_rbytes) as f64 / dur).round() as u64; } if cur.io_wbytes >= last.io_wbytes { rep.io_wbps = ((cur.io_wbytes - last.io_wbytes) as f64 / dur).round() as u64; } rep.io_util = (cur.io_usage - last.io_usage) as f64 / 1_000_000.0 / dur; rep.io_usage = cur.io_usage as f64 / 1_000_000.0; rep.cpu_stalls = cur.cpu_stalls; rep.mem_stalls = cur.mem_stalls; rep.io_stalls = cur.io_stalls; rep.cpu_pressures = ( ((cur.cpu_stalls.0 - last.cpu_stalls.0) / dur) .min(1.0) .max(0.0), ((cur.cpu_stalls.1 - last.cpu_stalls.1) / dur) .min(1.0) .max(0.0), ); rep.mem_pressures = ( ((cur.mem_stalls.0 - last.mem_stalls.0) / dur) .min(1.0) .max(0.0), ((cur.mem_stalls.1 - last.mem_stalls.1) / dur) .min(1.0) .max(0.0), ); rep.io_pressures = ( ((cur.io_stalls.0 - last.io_stalls.0) / dur) .min(1.0) .max(0.0), ((cur.io_stalls.1 - last.io_stalls.1) / dur) .min(1.0) .max(0.0), ); } reps.insert(unit.into(), rep); } self.at = now; self.cpu_total = cpu_total; self.usages = usages; Ok(reps) } } struct ReportFile { intv: u64, retention: Option<u64>, path: String, d_path: String, next_at: u64, usage_tracker: UsageTracker, hashd_acc: [HashdReport; 2], mem_stat_acc: BTreeMap<String, StatMap>, io_stat_acc: BTreeMap<String, StatMap>, vmstat_acc: StatMap, iolat_acc: IoLatReport, iocost_acc: IoCostReport, nr_samples: u32, } pub fn clear_old_report_files(d_path: &str, retention: Option<u64>, now: u64) -> Result<()> { for path in fs::read_dir(d_path)? .filter_map(|x| x.ok()) .map(|x| x.path()) { if retention.is_none() { return Ok(()); } let name = path .file_name() .unwrap_or_else(|| OsStr::new("")) .to_str() .unwrap_or(""); let stamp = match scan_fmt!(name, "{d}.json", u64) { Ok(v) => v, Err(_) => continue, }; if stamp < now - retention.unwrap() { if let Err(e) = fs::remove_file(&path) { warn!( "report: Failed to remove stale report {:?} ({:?})", &path, &e ); } else { debug!("report: Removed stale report {:?}", &path); } } } Ok(()) } impl ReportFile { fn new( intv: u64, retention: Option<u64>, path: &str, d_path: &str, devnr: (u32, u32), runner: Runner, ) -> ReportFile { let now = unix_now(); let rf = Self { intv, retention, path: path.into(), d_path: d_path.into(), next_at: ((now / intv) + 1) * intv, usage_tracker: UsageTracker::new(devnr, runner), hashd_acc: Default::default(), mem_stat_acc: Default::default(), io_stat_acc: Default::default(), vmstat_acc: Default::default(), iolat_acc: Default::default(), iocost_acc: Default::default(), nr_samples: 0, }; if let Err(e) = clear_old_report_files(d_path, retention, now) { warn!("report: Failed to clear stale report files ({:?})", &e); } rf } fn acc_stat_map(lhs: &mut StatMap, rhs: &StatMap) { for (rhs_k, rhs_v) in rhs.iter() { match lhs.get_mut(rhs_k) { Some(lhs_v) => *lhs_v += rhs_v, None => { lhs.insert(rhs_k.to_owned(), *rhs_v); } } } } fn acc_slice_stat_map(lhs: &mut BTreeMap<String, StatMap>, rhs: &BTreeMap<String, StatMap>) { for (rhs_slice, rhs_map) in rhs.iter() { match lhs.get_mut(rhs_slice) { Some(lhs_map) => Self::acc_stat_map(lhs_map, rhs_map), None => { lhs.insert(rhs_slice.to_owned(), rhs_map.clone()); } } } } fn div_stat_map(lhs: &mut StatMap, div: f64) { for (_, v) in lhs.iter_mut() { *v /= div; } } fn div_slice_stat_map(lhs: &mut BTreeMap<String, StatMap>, div: f64) { for (_, map) in lhs.iter_mut() { Self::div_stat_map(map, div); } } fn tick(&mut self, base_report: &Report, now: u64) { for i in 0..2 { self.hashd_acc[i] += &base_report.hashd[i]; } Self::acc_slice_stat_map(&mut self.mem_stat_acc, &base_report.mem_stat); Self::acc_slice_stat_map(&mut self.io_stat_acc, &base_report.io_stat); Self::acc_stat_map(&mut self.vmstat_acc, &base_report.vmstat); self.iolat_acc.accumulate(&base_report.iolat); self.iocost_acc += &base_report.iocost; self.nr_samples += 1; if now < self.next_at { return; } trace!("report: Reporting {}s summary at {}", self.intv, now); let was_at = self.next_at - self.intv; self.next_at = (now / self.intv + 1) * self.intv; // fill in report let report_path = format!("{}/{}.json", &self.d_path, now / self.intv * self.intv); let mut report_file = JsonReportFile::<Report>::new(Some(&report_path)); report_file.data = base_report.clone(); let report = &mut report_file.data; for i in 0..2 { self.hashd_acc[i] /= self.nr_samples; report.hashd[i] = HashdReport { svc: report.hashd[i].svc.clone(), phase: report.hashd[i].phase, ..self.hashd_acc[i].clone() }; } self.hashd_acc = Default::default(); Self::div_slice_stat_map(&mut self.mem_stat_acc, self.nr_samples as f64); Self::div_slice_stat_map(&mut self.io_stat_acc, self.nr_samples as f64); Self::div_stat_map(&mut self.vmstat_acc, self.nr_samples as f64); std::mem::swap(&mut report.mem_stat, &mut self.mem_stat_acc); std::mem::swap(&mut report.io_stat, &mut self.io_stat_acc); std::mem::swap(&mut report.vmstat, &mut self.vmstat_acc); self.mem_stat_acc.clear(); self.io_stat_acc.clear(); self.vmstat_acc.clear(); report.iolat = self.iolat_acc.clone(); self.iolat_acc = Default::default(); self.iocost_acc /= self.nr_samples; report.iocost = self.iocost_acc.clone(); self.iocost_acc = Default::default(); self.nr_samples = 0; report.usages = match self.usage_tracker.update() { Ok(v) => v, Err(e) => { warn!("report: Failed to update {}s usages ({:?})", self.intv, &e); return; } }; for slice in &[ROOT_SLICE, Slice::Work.name(), Slice::Sys.name()] { if let Some(usage) = self.usage_tracker.usages.get(&slice.to_string()) { report .mem_stat .insert(slice.to_string(), usage.mem_stat.clone()); report .io_stat .insert(slice.to_string(), usage.io_stat.clone()); } } match read_stat_file("/proc/vmstat") { Ok(map) => report.vmstat = map, Err(e) => warn!("report: Failed to read vmstat ({:?})", &e), } // write out to the unix timestamped file if let Err(e) = report_file.commit() { warn!("report: Failed to write {}s summary ({:?})", self.intv, &e); } // symlink the current report file let staging_path = format!("{}.staging", &self.path); let _ = fs::remove_file(&staging_path); if let Err(e) = symlink(&report_path, &staging_path) { warn!( "report: Failed to symlink {:?} to {:?} ({:?})", &report_path, &staging_path, &e ); } if let Err(e) = fs::rename(&staging_path, &self.path) { warn!( "report: Failed to move {:?} to {:?} ({:?})", &staging_path, &self.path, &e ); } // delete expired ones if let Some(retention) = self.retention { for i in was_at..now { let path = format!("{}/{}.json", &self.d_path, i - retention); trace!("report: Removing expired {:?}", &path); let _ = fs::remove_file(&path); } } } } struct IoLatReader { biolatpcts_bin: Option<String>, devnr: (u32, u32), name: String, intv: String, tx: Option<Sender<String>>, rx: Option<Receiver<String>>, child: Option<Child>, jh: Option<JoinHandle<()>>, } impl IoLatReader { fn start_iolat( biolatpcts_bin: &str, devnr: (u32, u32), name: &str, intv: &str, tx: Sender<String>, ) -> Result<(Child, JoinHandle<()>)> { let mut child = Command::new(biolatpcts_bin) .arg(format!("{}:{}", devnr.0, devnr.1)) .args(&["-i", intv, "--json"]) .arg("-p") .arg( IoLatReport::PCTS .iter() .map(|x| format!("{}", x)) .collect::<Vec<String>>() .join(","), ) .stdout(Stdio::piped()) .spawn()?; let name = name.to_string(); let stdout = child.stdout.take().unwrap(); let jh = spawn(move || child_reader_thread(name, stdout, tx)); Ok((child, jh)) } fn reset(&mut self) -> Result<()> { self.disconnect(); let (tx, rx) = channel::unbounded::<String>(); self.rx = Some(rx); if self.biolatpcts_bin.is_some() { let (child, jh) = Self::start_iolat( self.biolatpcts_bin.as_ref().unwrap(), self.devnr, &self.name, &self.intv, tx, )?; self.child = Some(child); self.jh = Some(jh); } else { self.tx = Some(tx); } Ok(()) } fn new(cfg: &Config, name: &str, intv: &str) -> Result<Self> { let mut iolat = Self { biolatpcts_bin: cfg.biolatpcts_bin.as_ref().map(|x| x.to_owned()), devnr: cfg.scr_devnr, name: name.to_owned(), intv: intv.to_owned(), tx: None, rx: None, child: None, jh: None, }; iolat.reset()?; Ok(iolat) } fn kick(&self) { if self.child.is_some() { kill( Pid::from_raw(self.child.as_ref().unwrap().id() as i32), Signal::SIGUSR2, ) .unwrap(); } } fn disconnect(&mut self) { self.tx.take(); self.rx.take(); if self.child.is_some() { let _ = self.child.as_mut().unwrap().kill(); let _ = self.child.as_mut().unwrap().wait(); self.jh.take().unwrap().join().unwrap(); } } } impl Drop for IoLatReader { fn drop(&mut self) { self.disconnect(); } } struct ReportWorker { runner: Runner, term_rx: Receiver<()>, report_file: ReportFile, report_file_1min: ReportFile, iolat: IoLatReport, iolat_cum: IoLatReport, iocost_devnr: (u32, u32), } impl ReportWorker { pub fn new(runner: Runner, term_rx: Receiver<()>) -> Result<Self> { let rdata = runner.data.lock().unwrap(); // ReportFile init may try to lock runner. Fetch all the needed data // and unlock it. let cfg = &rdata.cfg; let scr_devnr = cfg.scr_devnr; let (rep_ret, rep_path, rep_d_path) = ( cfg.rep_retention, cfg.report_path.clone(), cfg.report_d_path.clone(), ); let (rep_1min_ret, rep_1min_path, rep_1min_d_path) = ( cfg.rep_1min_retention, cfg.report_1min_path.clone(), cfg.report_1min_d_path.clone(), ); drop(rdata); Ok(Self { term_rx, report_file: ReportFile::new( 1, rep_ret, &rep_path, &rep_d_path, scr_devnr, runner.clone(), ), report_file_1min: ReportFile::new( 60, rep_1min_ret, &rep_1min_path, &rep_1min_d_path, scr_devnr, runner.clone(), ), iolat: Default::default(), iolat_cum: Default::default(), iocost_devnr: scr_devnr, runner, }) } fn base_report(&mut self) -> Result<Report> { let now = SystemTime::now(); let expiration = now - Duration::from_secs(3); let mut runner = self.runner.data.lock().unwrap(); let hashd = runner.hashd_set.report(expiration)?; let (bench_hashd, bench_hashd_phase) = match runner.bench_hashd.as_mut() { Some(svc) => ( super::svc_refresh_and_report(&mut svc.unit)?, hashd[0].phase, ), None => (Default::default(), Default::default()), }; let bench_iocost = match runner.bench_iocost.as_mut() { Some(svc) => super::svc_refresh_and_report(&mut svc.unit)?, None => Default::default(), }; let seq = super::instance_seq(); let dseqs = &runner.sobjs.slice_file.data.disable_seqs; let resctl = ResCtlReport { cpu: dseqs.cpu < seq, mem: dseqs.mem < seq, io: dseqs.io < seq, }; Ok(Report { timestamp: DateTime::from(now), seq: super::instance_seq(), state: runner.state, resctl, oomd: runner.sobjs.oomd.report()?, sideloader: runner.sobjs.sideloader.report()?, bench_hashd: BenchHashdReport { svc: bench_hashd, phase: bench_hashd_phase, mem_probe_size: hashd[0].mem_probe_size, mem_probe_at: hashd[0].mem_probe_at, }, bench_iocost: BenchIoCostReport { svc: bench_iocost }, hashd, sysloads: runner.side_runner.report_sysloads()?, sideloads: runner.side_runner.report_sideloads()?, iolat: self.iolat.clone(), iolat_cum: self.iolat_cum.clone(), iocost: IoCostReport::read(self.iocost_devnr)?, swappiness: read_swappiness()?, zswap_enabled: read_zswap_enabled()?, ..Default::default() }) } fn parse_iolat_output(line: &str) -> Result<IoLatReport> { let parsed = json::parse(line)?; let mut iolat_map = IoLatReport::default(); for key in &["read", "write", "discard", "flush"] { let key = key.to_string(); let iolat = iolat_map .map .get_mut(&key) .ok_or_else(|| anyhow!("{:?} missing in iolat output {:?}", &key, line))?; for (k, v) in parsed[&key].entries() { let v = v .as_f64() .ok_or_else(|| anyhow!("failed to parse latency from {:?}", &line))?; if iolat.insert(k.to_string(), v).is_none() { panic!( "report: {:?}:{:?} -> {:?} was missing in the template", key, k, v, ); } } } Ok(iolat_map) } fn maybe_retry_iolat(retries: &mut u32, iolat: &mut IoLatReader, e: &dyn std::error::Error) { if *retries > 0 && !prog_exiting() { *retries -= 1; warn!("report: iolat reader thread failed ({:?}), retrying...", e); iolat.reset().unwrap(); } else { error!("report: iolat reader thread failed ({:?}), giving up", e); panic!(); } } fn run_inner(mut self) { let mut next_at = unix_now() + 1; let runner = self.runner.data.lock().unwrap(); let cfg = &runner.cfg; let mut iolat = IoLatReader::new(cfg, "iolat", "1").unwrap(); let mut iolat_cum = IoLatReader::new(cfg, "iolat_cum", "-1").unwrap(); drop(runner); let mut sleep_dur = Duration::from_secs(0); let mut iolat_retries = crate::misc::BCC_RETRIES; let mut iolat_cum_retries = crate::misc::BCC_RETRIES; let mut iolat_cum_kicked_at = UNIX_EPOCH; 'outer: loop { select! { recv(iolat.rx.as_ref().unwrap()) -> res => { // the cumulative instance doesn't have an interval, // kick it and run it at the same pace as the 1s one. If // we stalled for a while, we may busy loop here kicking // iolat_cum repeatedly causing the python signal // handler to hit maximum recursion limit and fail. // Don't kick in quick succession. let now = SystemTime::now(); match now.duration_since(iolat_cum_kicked_at) { Ok(dur) => { if dur.as_secs_f64() > 0.1 { iolat_cum.kick(); iolat_cum_kicked_at = now; } } Err(_) => iolat_cum_kicked_at = now, } match res { Ok(line) => { match Self::parse_iolat_output(&line) { Ok(v) => self.iolat = v, Err(e) => warn!("report: failed to parse iolat output ({:?})", &e), } } Err(e) => Self::maybe_retry_iolat(&mut iolat_retries, &mut iolat, &e), } }, recv(iolat_cum.rx.as_ref().unwrap()) -> res => { match res { Ok(line) => { match Self::parse_iolat_output(&line) { Ok(v) => self.iolat_cum = v, Err(e) => warn!("report: failed to parse iolat_cum output ({:?})", &e), } } Err(e) => Self::maybe_retry_iolat(&mut iolat_cum_retries, &mut iolat_cum, &e), } }, recv(self.term_rx) -> term => { if let Err(e) = term { info!("report: Term ({})", &e); break 'outer; } }, recv(channel::after(sleep_dur)) -> _ => (), } let sleep_till = UNIX_EPOCH + Duration::from_secs(next_at) + Duration::from_millis(500); match sleep_till.duration_since(SystemTime::now()) { Ok(v) => { sleep_dur = v; trace!("report: Sleeping {}ms", sleep_dur.as_millis()); continue 'outer; } _ => {} } // base_report() generation may take some time. Timestamp here. let now = unix_now(); // generate base let base_report = match self.base_report() { Ok(v) => v, Err(e) => { error!("report: Failed to generate base report ({:?})", &e); continue; } }; self.report_file.tick(&base_report, now); self.report_file_1min.tick(&base_report, now); // Report generation and writing could have taken a while. If we // overshot the 500ms window and are in the next second window, // we wanna wait for the next window. Re-acquire the current // second to determine when the next report will be generated. next_at = unix_now() + 1; } } pub fn run(self) { if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| self.run_inner())) { error!("report: worker thread panicked ({:?})", &e); set_prog_exiting(); } } } pub struct Reporter { term_tx: Option<Sender<()>>, join_handle: Option<JoinHandle<()>>, } impl Reporter { pub fn new(runner: Runner) -> Result<Self> { let (term_tx, term_rx) = channel::unbounded::<()>(); let worker = ReportWorker::new(runner, term_rx)?; let jh = spawn(|| worker.run()); Ok(Self { term_tx: Some(term_tx), join_handle: Some(jh), }) } } impl Drop for Reporter { fn drop(&mut self) { let term_tx = self.term_tx.take().unwrap(); drop(term_tx); let jh = self.join_handle.take().unwrap(); jh.join().unwrap(); } }