in below/src/main.rs [833:1009]
fn record(
init: init::InitToken,
logger: slog::Logger,
errs: Receiver<Error>,
interval: Duration,
below_config: &BelowConfig,
retention: Option<Duration>,
store_size_limit: Option<u64>,
collect_io_stat: bool,
skew_detection_threshold: Duration,
debug: bool,
disable_disk_stat: bool,
disable_exitstats: bool,
enable_gpu_stats: bool,
compress_opts: &CompressOpts,
) -> Result<()> {
debug!(logger, "Starting up!");
if !disable_exitstats {
bump_memlock_rlimit()?;
}
let mut store = store::StoreWriter::new(
logger.clone(),
&below_config.store_dir,
compress_opts.to_compression_mode()?,
store::Format::Cbor,
)?;
let mut stats = statistics::Statistics::new();
let (exit_buffer, bpf_errs) = if disable_exitstats {
(Arc::new(Mutex::new(procfs::PidMap::new())), None)
} else {
start_exitstat(logger.clone(), debug)
};
let mut bpf_err_warned = false;
// Handle cgroup filter from conf and generate Regex
let cgroup_re = if !below_config.cgroup_filter_out.is_empty() {
Some(
Regex::new(&below_config.cgroup_filter_out)
.expect("Failed to generate regex from cgroup_filter_out in below.conf"),
)
} else {
None
};
#[cfg(fbcode_build)]
let gpu_stats_receiver = if enable_gpu_stats {
let thread_interval = interval.clone();
let gpu_collector = model::gpu_stats_collector_plugin::GpuStatsCollectorPlugin::new(
init.fb,
logger.clone(),
)
.context("Failed to initialize GPU stats collector")?;
let (mut collector, receiver) = model::collector_plugin::collector_consumer(gpu_collector);
let logger_clone = logger.clone();
thread::Builder::new()
.name("gpu_collector".to_owned())
.spawn(move || {
loop {
let collect_instant = Instant::now();
match futures::executor::block_on(collector.collect_and_update()) {
Ok(_) => {}
Err(e) => error!(logger_clone, "{:?}", e),
}
let collect_duration = Instant::now().duration_since(collect_instant);
const COLLECT_DURATION_WARN_THRESHOLD: u64 = 2;
if collect_duration > Duration::from_secs(COLLECT_DURATION_WARN_THRESHOLD) {
warn!(
logger_clone,
"GPU collection took {} > {}",
collect_duration.as_secs_f64(),
COLLECT_DURATION_WARN_THRESHOLD
);
}
if thread_interval > collect_duration {
let sleep_duration = thread_interval - collect_duration;
std::thread::sleep(sleep_duration);
}
}
})
.expect("Failed to spawn thread");
Some(receiver)
} else {
None
};
let collector = model::Collector::new(
logger.clone(),
model::CollectorOptions {
cgroup_root: below_config.cgroup_root.clone(),
exit_data: exit_buffer,
collect_io_stat,
disable_disk_stat,
cgroup_re,
#[cfg(fbcode_build)]
gpu_stats_receiver,
},
);
loop {
if !disable_exitstats {
// Anything that comes over the error channel is an error
match errs.try_recv() {
Ok(e) => bail!(e),
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => bail!("error channel disconnected"),
};
if !bpf_err_warned {
bpf_err_warned = check_for_exitstat_errors(
&logger,
bpf_errs
.as_ref()
.expect("Failed to unwrap bpf_errs receiver"),
);
}
}
let collect_instant = Instant::now();
let collected_sample = collector.collect_sample();
let post_collect_sys_time = SystemTime::now();
let post_collect_instant = Instant::now();
let collection_skew = post_collect_instant.duration_since(collect_instant);
if collection_skew >= skew_detection_threshold {
warn!(
logger,
"data collection took {} ms (>= {} ms)",
collection_skew.as_millis(),
skew_detection_threshold.as_millis()
);
stats.report_collection_skew();
}
match collected_sample {
Ok(s) => {
match store.put(post_collect_sys_time, &DataFrame { sample: s }) {
Ok(/* new shard */ true) => {
cleanup_store(&store, &logger, store_size_limit, /* retention */ None)?
}
Ok(/* new shard */ false) => {}
Err(e) => error!(logger, "{:#}", e),
}
}
Err(e) => {
// Handle cgroupfs errors
match e.downcast_ref::<cgroupfs::Error>() {
// Unrecoverable error -- below only supports cgroup2
Some(cgroupfs::Error::NotCgroup2(_)) => bail!(e),
_ => {}
};
error!(logger, "{:#}", e);
}
};
// Only check against retention and not size limit. Size limit is only
// checked on creation of successful write to a new shard.
cleanup_store(&store, &logger, /* store_size_limit */ None, retention)?;
stats.report_store_size(below_config.store_dir.as_path());
let collect_duration = Instant::now().duration_since(collect_instant);
// Sleep for at least 1s to avoid sample collision
let sleep_duration = if interval > collect_duration {
std::cmp::max(Duration::from_secs(1), interval - collect_duration)
} else {
Duration::from_secs(1)
};
std::thread::sleep(sleep_duration);
}
}