in nfm-controller/src/events/event_provider_ebpf.rs [91:187]
fn perform_aggregation_cycle(&mut self, nat_resolver: &Box<dyn NatResolver>) {
info!("Aggregating across sockets");
// Apply adaptive sampling if we're receiving events faster than we can process.
if self.ebpf_counters().map_insertion_errors > 0 {
self.increase_sampling_interval();
} else {
self.decrease_sampling_interval();
}
// Retrieve properties of newly initiated sockets.
let mut new_cpu_sock_keys = HashSet::<CpuSockKey>::new();
let now_us = self.clock.now_us();
let context_timestamp = now_us - self.notrack_us / 2;
let mut sock_add_result = SockOperationResult::default();
for (cpu_sock_key, sock_context) in self.ebpf_sock_props.iter().flatten() {
let result =
self.sock_cache
.add_context(cpu_sock_key.sock_key, sock_context, context_timestamp);
sock_add_result.add(&result);
if result.failed > 0 {
break;
}
new_cpu_sock_keys.insert(cpu_sock_key);
}
// Free the now no-longer needed contexts from BPF. Thus, cycles are not spent re-reading
// these each iteration.
for cpu_sock_key in new_cpu_sock_keys.iter() {
if self.ebpf_sock_props.remove(cpu_sock_key).is_err() {
self.process_counters.socket_eviction_failed += 1;
}
}
// Aggregate stats across CPU cores, then take the delta from the previous aggregation cycle.
SocketQueries::aggregate_sock_stats(
self.ebpf_sock_stats.iter(),
&self.sock_cache,
&mut self.sock_stream,
);
let staleness_timestamp = now_us - self.notrack_us;
let sock_delta_result = self
.sock_cache
.update_stats_and_get_deltas(&mut self.sock_stream, staleness_timestamp);
// Apply beyond-NAT sock properties to any NAT'd sockets.
let sock_nat_result = nat_resolver.store_beyond_nat_entries(&mut self.sock_cache);
// Aggregate our delta stats into flows.
let num_flows_before = self.flow_cache.len();
let flow_aggregation_result = SocketQueries::aggregate_into_flows(
&self.sock_stream,
&self.sock_cache,
&mut self.flow_cache,
);
self.sock_stream.clear();
// Collect some stats before evicting entries.
self.agg_socks_handled = self.sock_cache.len().try_into().unwrap();
let (num_cpus_min, num_cpus_max, num_cpus_avg) = self.sock_cache.num_cpus();
// Evict sockets.
let (socks_to_evict, num_stale) = self.sock_cache.perform_eviction();
let sock_eviction_result = self.perform_bpf_eviction(socks_to_evict);
// Update counters.
self.process_counters.sockets_added += sock_add_result.completed;
self.process_counters.sockets_stale += num_stale;
self.process_counters.sockets_natd += sock_nat_result.completed;
self.process_counters.socket_deltas_completed += sock_delta_result.completed;
self.process_counters.socket_deltas_missing_props += sock_delta_result.partial;
self.process_counters.socket_deltas_above_limit += sock_delta_result.failed;
self.process_counters.socket_agg_completed += flow_aggregation_result.completed;
self.process_counters.socket_agg_missing_props += flow_aggregation_result.partial;
self.process_counters.socket_agg_above_limit += flow_aggregation_result.failed;
self.process_counters.socket_eviction_completed += sock_eviction_result.completed;
self.process_counters.socket_eviction_failed += sock_eviction_result.failed;
info!(
sock_add_result:serde,
sock_delta_result:serde,
sock_nat_result:serde,
flow_aggregation_result:serde,
sock_eviction_result:serde,
control_data:serde = self.ebpf_control_data,
sock_cache_len = self.sock_cache.len(),
flows_before = num_flows_before,
flows_after = self.flow_cache.len(),
cpus_per_sock_min = num_cpus_min,
cpus_per_sock_avg = num_cpus_avg,
cpus_per_sock_max = num_cpus_max;
"Aggregation complete"
);
}