in nfm-controller/src/events/event_provider_ebpf.rs [795:862]
fn test_flow_aggregation_with_nat() {
let mut sock_stream: HashMap<SockKey, AggSockStats> = HashMap::new();
let mut sock_cache = SockCache::with_max_entries(AGG_FLOWS_MAX_ENTRIES as usize);
let mut flow_cache: HashMap<FlowProperties, AggregateResults> = HashMap::new();
// Aggregate many sockets into one flow.
let now_us: u64 = 2025;
for i in 0..AGG_FLOWS_MAX_ENTRIES {
let sock_key = i as SockKey;
let sock_context = SockContext {
address_family: AF_INET,
remote_ipv4: Ipv4Addr::from_str("4.3.2.1").unwrap().to_bits(),
..Default::default()
};
sock_cache.add_context(sock_key, sock_context, now_us);
sock_stream.insert(
sock_key,
AggSockStats {
stats: SockStats {
bytes_received: 1,
..Default::default()
},
cpus: vec![],
},
);
}
// Apply some NAT results.
for (_key, sock_wrap) in sock_cache.iter_mut() {
sock_wrap.context_external = Some(SockContext {
local_ipv4: Ipv4Addr::from_str("10.6.7.8").unwrap().to_bits(),
remote_ipv4: Ipv4Addr::from_str("44.33.22.11").unwrap().to_bits(),
local_port: 22,
remote_port: 2938,
address_family: AF_INET,
is_client: false,
..Default::default()
});
}
// Do some aggregation.
let agg_result =
SocketQueries::aggregate_into_flows(&sock_stream, &sock_cache, &mut flow_cache);
assert_eq!(
agg_result,
SockOperationResult {
completed: AGG_FLOWS_MAX_ENTRIES.into(),
partial: 0,
failed: 0,
}
);
// Confirm the flow represents the view beyond local NAT.
assert_eq!(flow_cache.len(), 1);
for flow in flow_cache.keys() {
assert_eq!(
*flow,
FlowProperties {
protocol: InetProtocol::TCP,
local_address: IpAddr::from_str("10.6.7.8").unwrap(),
remote_address: IpAddr::from_str("44.33.22.11").unwrap(),
local_port: 22,
remote_port: 0,
kubernetes_metadata: None,
}
);
}
}