in rd-agent/src/report.rs [492:601]
fn tick(&mut self, base_report: &Report, now: u64) {
for i in 0..2 {
self.hashd_acc[i] += &base_report.hashd[i];
}
Self::acc_slice_stat_map(&mut self.mem_stat_acc, &base_report.mem_stat);
Self::acc_slice_stat_map(&mut self.io_stat_acc, &base_report.io_stat);
Self::acc_stat_map(&mut self.vmstat_acc, &base_report.vmstat);
self.iolat_acc.accumulate(&base_report.iolat);
self.iocost_acc += &base_report.iocost;
self.nr_samples += 1;
if now < self.next_at {
return;
}
trace!("report: Reporting {}s summary at {}", self.intv, now);
let was_at = self.next_at - self.intv;
self.next_at = (now / self.intv + 1) * self.intv;
// fill in report
let report_path = format!("{}/{}.json", &self.d_path, now / self.intv * self.intv);
let mut report_file = JsonReportFile::<Report>::new(Some(&report_path));
report_file.data = base_report.clone();
let report = &mut report_file.data;
for i in 0..2 {
self.hashd_acc[i] /= self.nr_samples;
report.hashd[i] = HashdReport {
svc: report.hashd[i].svc.clone(),
phase: report.hashd[i].phase,
..self.hashd_acc[i].clone()
};
}
self.hashd_acc = Default::default();
Self::div_slice_stat_map(&mut self.mem_stat_acc, self.nr_samples as f64);
Self::div_slice_stat_map(&mut self.io_stat_acc, self.nr_samples as f64);
Self::div_stat_map(&mut self.vmstat_acc, self.nr_samples as f64);
std::mem::swap(&mut report.mem_stat, &mut self.mem_stat_acc);
std::mem::swap(&mut report.io_stat, &mut self.io_stat_acc);
std::mem::swap(&mut report.vmstat, &mut self.vmstat_acc);
self.mem_stat_acc.clear();
self.io_stat_acc.clear();
self.vmstat_acc.clear();
report.iolat = self.iolat_acc.clone();
self.iolat_acc = Default::default();
self.iocost_acc /= self.nr_samples;
report.iocost = self.iocost_acc.clone();
self.iocost_acc = Default::default();
self.nr_samples = 0;
report.usages = match self.usage_tracker.update() {
Ok(v) => v,
Err(e) => {
warn!("report: Failed to update {}s usages ({:?})", self.intv, &e);
return;
}
};
for slice in &[ROOT_SLICE, Slice::Work.name(), Slice::Sys.name()] {
if let Some(usage) = self.usage_tracker.usages.get(&slice.to_string()) {
report
.mem_stat
.insert(slice.to_string(), usage.mem_stat.clone());
report
.io_stat
.insert(slice.to_string(), usage.io_stat.clone());
}
}
match read_stat_file("/proc/vmstat") {
Ok(map) => report.vmstat = map,
Err(e) => warn!("report: Failed to read vmstat ({:?})", &e),
}
// write out to the unix timestamped file
if let Err(e) = report_file.commit() {
warn!("report: Failed to write {}s summary ({:?})", self.intv, &e);
}
// symlink the current report file
let staging_path = format!("{}.staging", &self.path);
let _ = fs::remove_file(&staging_path);
if let Err(e) = symlink(&report_path, &staging_path) {
warn!(
"report: Failed to symlink {:?} to {:?} ({:?})",
&report_path, &staging_path, &e
);
}
if let Err(e) = fs::rename(&staging_path, &self.path) {
warn!(
"report: Failed to move {:?} to {:?} ({:?})",
&staging_path, &self.path, &e
);
}
// delete expired ones
if let Some(retention) = self.retention {
for i in was_at..now {
let path = format!("{}/{}.json", &self.d_path, i - retention);
trace!("report: Removing expired {:?}", &path);
let _ = fs::remove_file(&path);
}
}
}