fn aggregate_into_flows()

in nfm-controller/src/events/event_provider_ebpf.rs [378:433]


    fn aggregate_into_flows(
        sock_stats_deltas: &HashMap<SockKey, AggSockStats>,
        sock_cache: &SockCache,
        flow_cache: &mut HashMap<FlowProperties, AggregateResults>,
    ) -> SockOperationResult {
        // Reset the counts of sockets per state.
        for flow in flow_cache.values_mut() {
            flow.stats.clear_levels();
        }

        let mut result = SockOperationResult::default();
        let flow_limit: usize = AGG_FLOWS_MAX_ENTRIES.try_into().unwrap();
        for (sock_key, agg_stats) in sock_stats_deltas.iter() {
            let sock_wrapper = match sock_cache.get(sock_key) {
                Some(ctx) => ctx,
                None => {
                    result.failed += 1;
                    continue;
                }
            };

            let context_beyond_nat = sock_wrapper
                .context_external
                .unwrap_or(sock_wrapper.context);
            if let Ok(flow_props) = FlowProperties::try_from(&context_beyond_nat) {
                let flow_count = flow_cache.len();
                let flow_entry = flow_cache.entry(flow_props.clone());
                let flow_agg = match flow_entry {
                    Entry::Occupied(o) => o.into_mut(),
                    Entry::Vacant(v) => {
                        if flow_count < flow_limit {
                            // When below the flow limit, track new flows.
                            v.insert(AggregateResults {
                                flow: flow_props,
                                stats: NetworkStats::default(),
                            })
                        } else {
                            // When at the flow limit, move on to the next socket.
                            result.failed += 1;
                            continue;
                        }
                    }
                };

                flow_agg.stats.add_from(&agg_stats.stats);
                if sock_wrapper.should_evict() {
                    flow_agg.stats.sockets_completed += 1;
                }
                result.completed += 1;
            } else {
                result.partial += 1;
            }
        }

        result
    }