fn perform_aggregation_cycle()

in nfm-controller/src/events/event_provider_ebpf.rs [91:187]


    fn perform_aggregation_cycle(&mut self, nat_resolver: &Box<dyn NatResolver>) {
        info!("Aggregating across sockets");

        // Apply adaptive sampling if we're receiving events faster than we can process.
        if self.ebpf_counters().map_insertion_errors > 0 {
            self.increase_sampling_interval();
        } else {
            self.decrease_sampling_interval();
        }

        // Retrieve properties of newly initiated sockets.
        let mut new_cpu_sock_keys = HashSet::<CpuSockKey>::new();
        let now_us = self.clock.now_us();
        let context_timestamp = now_us - self.notrack_us / 2;
        let mut sock_add_result = SockOperationResult::default();
        for (cpu_sock_key, sock_context) in self.ebpf_sock_props.iter().flatten() {
            let result =
                self.sock_cache
                    .add_context(cpu_sock_key.sock_key, sock_context, context_timestamp);
            sock_add_result.add(&result);
            if result.failed > 0 {
                break;
            }
            new_cpu_sock_keys.insert(cpu_sock_key);
        }

        // Free the now no-longer needed contexts from BPF.  Thus, cycles are not spent re-reading
        // these each iteration.
        for cpu_sock_key in new_cpu_sock_keys.iter() {
            if self.ebpf_sock_props.remove(cpu_sock_key).is_err() {
                self.process_counters.socket_eviction_failed += 1;
            }
        }

        // Aggregate stats across CPU cores, then take the delta from the previous aggregation cycle.
        SocketQueries::aggregate_sock_stats(
            self.ebpf_sock_stats.iter(),
            &self.sock_cache,
            &mut self.sock_stream,
        );
        let staleness_timestamp = now_us - self.notrack_us;
        let sock_delta_result = self
            .sock_cache
            .update_stats_and_get_deltas(&mut self.sock_stream, staleness_timestamp);

        // Apply beyond-NAT sock properties to any NAT'd sockets.
        let sock_nat_result = nat_resolver.store_beyond_nat_entries(&mut self.sock_cache);

        // Aggregate our delta stats into flows.
        let num_flows_before = self.flow_cache.len();
        let flow_aggregation_result = SocketQueries::aggregate_into_flows(
            &self.sock_stream,
            &self.sock_cache,
            &mut self.flow_cache,
        );
        self.sock_stream.clear();

        // Collect some stats before evicting entries.
        self.agg_socks_handled = self.sock_cache.len().try_into().unwrap();
        let (num_cpus_min, num_cpus_max, num_cpus_avg) = self.sock_cache.num_cpus();

        // Evict sockets.
        let (socks_to_evict, num_stale) = self.sock_cache.perform_eviction();
        let sock_eviction_result = self.perform_bpf_eviction(socks_to_evict);

        // Update counters.
        self.process_counters.sockets_added += sock_add_result.completed;
        self.process_counters.sockets_stale += num_stale;
        self.process_counters.sockets_natd += sock_nat_result.completed;

        self.process_counters.socket_deltas_completed += sock_delta_result.completed;
        self.process_counters.socket_deltas_missing_props += sock_delta_result.partial;
        self.process_counters.socket_deltas_above_limit += sock_delta_result.failed;

        self.process_counters.socket_agg_completed += flow_aggregation_result.completed;
        self.process_counters.socket_agg_missing_props += flow_aggregation_result.partial;
        self.process_counters.socket_agg_above_limit += flow_aggregation_result.failed;

        self.process_counters.socket_eviction_completed += sock_eviction_result.completed;
        self.process_counters.socket_eviction_failed += sock_eviction_result.failed;

        info!(
            sock_add_result:serde,
            sock_delta_result:serde,
            sock_nat_result:serde,
            flow_aggregation_result:serde,
            sock_eviction_result:serde,
            control_data:serde = self.ebpf_control_data,
            sock_cache_len = self.sock_cache.len(),
            flows_before = num_flows_before,
            flows_after = self.flow_cache.len(),
            cpus_per_sock_min = num_cpus_min,
            cpus_per_sock_avg = num_cpus_avg,
            cpus_per_sock_max = num_cpus_max;
            "Aggregation complete"
        );
    }