fn do_work()

in nfm-controller/src/lib.rs [186:287]


fn do_work(
    mut provider: impl EventProvider,
    mut nat_resolver: Box<dyn NatResolver>,
    mut event_filter: impl EventFilter<AggregateResults>,
    publisher: impl ReportPublisher,
    mut metadata_provider: impl MultiMetadataProvider,
    mut host_stats_provider: impl HostStatsProvider,
    opt: Options,
) {
    let memory_inspector = ProcessMemoryInspector::new();
    let mut cpu_monitor = CpuUsageMonitor::start();
    let mut timer = EventTimer::new(SystemBootClock {});

    let aggregate_event = timer.add_event(
        Duration::from_millis(opt.aggregate_msecs),
        Duration::from_secs(0),
    );
    let publish_event = timer.add_event(
        Duration::from_secs(opt.publish_secs),
        Duration::from_secs(opt.jitter_secs),
    );

    // Register POSIX signals for which we want to exit gracefully.
    let should_exit = Arc::new(AtomicBool::new(false));
    signal_hook::flag::register(SIGINT, Arc::clone(&should_exit))
        .expect("Failed to register SIGINT handler");
    signal_hook::flag::register(SIGQUIT, Arc::clone(&should_exit))
        .expect("Failed to register SIGQUIT handler");
    signal_hook::flag::register(SIGTERM, Arc::clone(&should_exit))
        .expect("Failed to register SIGTERM handler");
    timer.set_exit_flag(should_exit);

    let enable_usage_data = opt.usage_data == OnOff::On;
    let mut failed_reports_count = 0;
    let mut usage_stats = UsageStats::default();
    let mut k8s_metadata_collector = KubernetesMetadataCollector::new();
    if opt.kubernetes_metadata == OnOff::On {
        k8s_metadata_collector.setup_watchers();
    }

    loop {
        let event_id = timer.await_next_event();
        if event_id == event_timer::EXIT_EVENT {
            info!("Exiting");
            return;
        }

        if event_id == aggregate_event {
            nat_resolver.perform_aggregation_cycle();
            provider.perform_aggregation_cycle(&nat_resolver);
            nat_resolver.perform_eviction();
        } else if event_id == publish_event {
            // Add network stats to the report.
            let mut report = NfmReport::new();
            report.set_failed_reports(failed_reports_count);

            let mut agg_flows = provider.network_stats();
            if opt.kubernetes_metadata == OnOff::On {
                k8s_metadata_collector.enrich_flows(&mut agg_flows);
            }
            report.set_network_stats(event_filter.filter_events(agg_flows));

            // Add process stats to the report.
            if enable_usage_data {
                // Compute CPU usage then restart the monitor.
                usage_stats.cpu_util = cpu_monitor.usage_ratio();
                cpu_monitor = CpuUsageMonitor::start();

                report.set_process_stats(ProcessStats {
                    counters: provider.counters(),
                    usage: vec![usage_stats],
                });

                // Reset stats for the next report.
                usage_stats = UsageStats::default();
            }

            // Add metadata
            metadata_provider.refresh();
            report.set_env_metadata(metadata_provider.get_metadata());

            host_stats_provider.set_network_devices(&metadata_provider.get_network_devices());
            report.set_host_stats(host_stats_provider.get_stats());

            if publisher.publish(&report) {
                failed_reports_count = 0;
            } else {
                failed_reports_count += 1;
            }
        } else {
            error!("Received unknown event ID: {event_id}");
        }

        // We'll publish the highest mem usage during the report period.
        if enable_usage_data {
            let (mem_used_kb, mem_used_ratio) = memory_inspector.usage();
            usage_stats.mem_used_kb = usage_stats.mem_used_kb.max(mem_used_kb);
            usage_stats.mem_used_ratio = usage_stats.mem_used_ratio.max(mem_used_ratio);
            usage_stats.sockets_tracked = usage_stats.sockets_tracked.max(provider.socket_count());
        }
    }
}