in nfm-controller/src/lib.rs [186:287]
fn do_work(
mut provider: impl EventProvider,
mut nat_resolver: Box<dyn NatResolver>,
mut event_filter: impl EventFilter<AggregateResults>,
publisher: impl ReportPublisher,
mut metadata_provider: impl MultiMetadataProvider,
mut host_stats_provider: impl HostStatsProvider,
opt: Options,
) {
let memory_inspector = ProcessMemoryInspector::new();
let mut cpu_monitor = CpuUsageMonitor::start();
let mut timer = EventTimer::new(SystemBootClock {});
let aggregate_event = timer.add_event(
Duration::from_millis(opt.aggregate_msecs),
Duration::from_secs(0),
);
let publish_event = timer.add_event(
Duration::from_secs(opt.publish_secs),
Duration::from_secs(opt.jitter_secs),
);
// Register POSIX signals for which we want to exit gracefully.
let should_exit = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(SIGINT, Arc::clone(&should_exit))
.expect("Failed to register SIGINT handler");
signal_hook::flag::register(SIGQUIT, Arc::clone(&should_exit))
.expect("Failed to register SIGQUIT handler");
signal_hook::flag::register(SIGTERM, Arc::clone(&should_exit))
.expect("Failed to register SIGTERM handler");
timer.set_exit_flag(should_exit);
let enable_usage_data = opt.usage_data == OnOff::On;
let mut failed_reports_count = 0;
let mut usage_stats = UsageStats::default();
let mut k8s_metadata_collector = KubernetesMetadataCollector::new();
if opt.kubernetes_metadata == OnOff::On {
k8s_metadata_collector.setup_watchers();
}
loop {
let event_id = timer.await_next_event();
if event_id == event_timer::EXIT_EVENT {
info!("Exiting");
return;
}
if event_id == aggregate_event {
nat_resolver.perform_aggregation_cycle();
provider.perform_aggregation_cycle(&nat_resolver);
nat_resolver.perform_eviction();
} else if event_id == publish_event {
// Add network stats to the report.
let mut report = NfmReport::new();
report.set_failed_reports(failed_reports_count);
let mut agg_flows = provider.network_stats();
if opt.kubernetes_metadata == OnOff::On {
k8s_metadata_collector.enrich_flows(&mut agg_flows);
}
report.set_network_stats(event_filter.filter_events(agg_flows));
// Add process stats to the report.
if enable_usage_data {
// Compute CPU usage then restart the monitor.
usage_stats.cpu_util = cpu_monitor.usage_ratio();
cpu_monitor = CpuUsageMonitor::start();
report.set_process_stats(ProcessStats {
counters: provider.counters(),
usage: vec![usage_stats],
});
// Reset stats for the next report.
usage_stats = UsageStats::default();
}
// Add metadata
metadata_provider.refresh();
report.set_env_metadata(metadata_provider.get_metadata());
host_stats_provider.set_network_devices(&metadata_provider.get_network_devices());
report.set_host_stats(host_stats_provider.get_stats());
if publisher.publish(&report) {
failed_reports_count = 0;
} else {
failed_reports_count += 1;
}
} else {
error!("Received unknown event ID: {event_id}");
}
// We'll publish the highest mem usage during the report period.
if enable_usage_data {
let (mem_used_kb, mem_used_ratio) = memory_inspector.usage();
usage_stats.mem_used_kb = usage_stats.mem_used_kb.max(mem_used_kb);
usage_stats.mem_used_ratio = usage_stats.mem_used_ratio.max(mem_used_ratio);
usage_stats.sockets_tracked = usage_stats.sockets_tracked.max(provider.socket_count());
}
}
}