in rd-agent/src/report.rs [853:949]
fn run_inner(mut self) {
let mut next_at = unix_now() + 1;
let runner = self.runner.data.lock().unwrap();
let cfg = &runner.cfg;
let mut iolat = IoLatReader::new(cfg, "iolat", "1").unwrap();
let mut iolat_cum = IoLatReader::new(cfg, "iolat_cum", "-1").unwrap();
drop(runner);
let mut sleep_dur = Duration::from_secs(0);
let mut iolat_retries = crate::misc::BCC_RETRIES;
let mut iolat_cum_retries = crate::misc::BCC_RETRIES;
let mut iolat_cum_kicked_at = UNIX_EPOCH;
'outer: loop {
select! {
recv(iolat.rx.as_ref().unwrap()) -> res => {
// the cumulative instance doesn't have an interval,
// kick it and run it at the same pace as the 1s one. If
// we stalled for a while, we may busy loop here kicking
// iolat_cum repeatedly causing the python signal
// handler to hit maximum recursion limit and fail.
// Don't kick in quick succession.
let now = SystemTime::now();
match now.duration_since(iolat_cum_kicked_at) {
Ok(dur) => {
if dur.as_secs_f64() > 0.1 {
iolat_cum.kick();
iolat_cum_kicked_at = now;
}
}
Err(_) => iolat_cum_kicked_at = now,
}
match res {
Ok(line) => {
match Self::parse_iolat_output(&line) {
Ok(v) => self.iolat = v,
Err(e) => warn!("report: failed to parse iolat output ({:?})", &e),
}
}
Err(e) => Self::maybe_retry_iolat(&mut iolat_retries, &mut iolat, &e),
}
},
recv(iolat_cum.rx.as_ref().unwrap()) -> res => {
match res {
Ok(line) => {
match Self::parse_iolat_output(&line) {
Ok(v) => self.iolat_cum = v,
Err(e) => warn!("report: failed to parse iolat_cum output ({:?})", &e),
}
}
Err(e) => Self::maybe_retry_iolat(&mut iolat_cum_retries, &mut iolat_cum, &e),
}
},
recv(self.term_rx) -> term => {
if let Err(e) = term {
info!("report: Term ({})", &e);
break 'outer;
}
},
recv(channel::after(sleep_dur)) -> _ => (),
}
let sleep_till = UNIX_EPOCH + Duration::from_secs(next_at) + Duration::from_millis(500);
match sleep_till.duration_since(SystemTime::now()) {
Ok(v) => {
sleep_dur = v;
trace!("report: Sleeping {}ms", sleep_dur.as_millis());
continue 'outer;
}
_ => {}
}
// base_report() generation may take some time. Timestamp here.
let now = unix_now();
// generate base
let base_report = match self.base_report() {
Ok(v) => v,
Err(e) => {
error!("report: Failed to generate base report ({:?})", &e);
continue;
}
};
self.report_file.tick(&base_report, now);
self.report_file_1min.tick(&base_report, now);
// Report generation and writing could have taken a while. If we
// overshot the 500ms window and are in the next second window,
// we wanna wait for the next window. Re-acquire the current
// second to determine when the next report will be generated.
next_at = unix_now() + 1;
}
}