in nfm-controller/src/events/event_provider_ebpf.rs [378:433]
fn aggregate_into_flows(
sock_stats_deltas: &HashMap<SockKey, AggSockStats>,
sock_cache: &SockCache,
flow_cache: &mut HashMap<FlowProperties, AggregateResults>,
) -> SockOperationResult {
// Reset the counts of sockets per state.
for flow in flow_cache.values_mut() {
flow.stats.clear_levels();
}
let mut result = SockOperationResult::default();
let flow_limit: usize = AGG_FLOWS_MAX_ENTRIES.try_into().unwrap();
for (sock_key, agg_stats) in sock_stats_deltas.iter() {
let sock_wrapper = match sock_cache.get(sock_key) {
Some(ctx) => ctx,
None => {
result.failed += 1;
continue;
}
};
let context_beyond_nat = sock_wrapper
.context_external
.unwrap_or(sock_wrapper.context);
if let Ok(flow_props) = FlowProperties::try_from(&context_beyond_nat) {
let flow_count = flow_cache.len();
let flow_entry = flow_cache.entry(flow_props.clone());
let flow_agg = match flow_entry {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
if flow_count < flow_limit {
// When below the flow limit, track new flows.
v.insert(AggregateResults {
flow: flow_props,
stats: NetworkStats::default(),
})
} else {
// When at the flow limit, move on to the next socket.
result.failed += 1;
continue;
}
}
};
flow_agg.stats.add_from(&agg_stats.stats);
if sock_wrapper.should_evict() {
flow_agg.stats.sockets_completed += 1;
}
result.completed += 1;
} else {
result.partial += 1;
}
}
result
}