in nfm-controller/src/kubernetes/kubernetes_metadata_collector.rs [603:763]
fn test_enrich_flows_multi_scenario() {
let mut collector = KubernetesMetadataCollector::new();
// Scenario 1: Verify, For a client flow, the remote pod is ALWAYS resolved if we have its pod info.
let pod_local = IpAddr::from_str("10.0.0.1").unwrap();
let pod_remote = IpAddr::from_str("10.0.0.2").unwrap();
let pod_info_local = create_pod_info("p-1", "ns-1", "s-1");
let pod_info_remote = create_pod_info("p-2", "ns-2", "s-2");
let polluter_pod_info = create_pod_info("x", "x", "x");
let pod_info: HashMap<IpAddr, HashMap<i32, PodInfo>> = HashMap::from([(
pod_remote,
HashMap::from([
// register multiple ports under this IP.
// we should still be able to resolve the 6666 one correctly
(6666, pod_info_remote.clone()),
(5555, polluter_pod_info.clone()),
(4444, polluter_pod_info.clone()),
]),
)]);
// create a single client flow towards remote pod port 6666
let mut flows: Vec<AggregateResults> = vec![create_agg_results(
0,
6666,
Some(pod_local),
Some(pod_remote),
)];
collector.pod_info_arc = Arc::new(Mutex::new(pod_info));
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).remote.unwrap(), pod_info_remote);
// Scenario 2: Verify, For a server flow, the local pod is ALWAYS resolved if we have its pod info.
let pod_info: HashMap<IpAddr, HashMap<i32, PodInfo>> = HashMap::from([(
pod_local,
HashMap::from([
// register multiple ports under this IP.
// we should still be able to resolve the 1111 one correctly
(1111, pod_info_local.clone()),
(2222, polluter_pod_info.clone()),
(3333, polluter_pod_info.clone()),
]),
)]);
// create a single client flow towards our (local) pod port 1111
let mut flows: Vec<AggregateResults> =
vec![create_agg_results(1111, 0, Some(pod_local), None)];
collector.pod_info_arc = Arc::new(Mutex::new(pod_info));
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).local.unwrap(), pod_info_local);
// Scenario 3: For a client flow, if the local ip IP has a single port registered, local pod can be resolved.
let pod_info: HashMap<IpAddr, HashMap<i32, PodInfo>> = HashMap::from([
(
pod_local,
HashMap::from([
// register a single port under this IP.
// we should still be able to resolve this one correctly in a client flow
// because its the only one that could originate the flow
(1111, pod_info_local.clone()),
]),
),
(
pod_remote,
HashMap::from([
// register multiple ports under this IP.
// we should still be able to resolve the 6666 one correctly
(6666, pod_info_remote.clone()),
(5555, polluter_pod_info.clone()),
(4444, polluter_pod_info.clone()),
]),
),
]);
// create a single client flow towards remote pod port 6666
let mut flows: Vec<AggregateResults> = vec![create_agg_results(
0,
6666,
Some(pod_local),
Some(pod_remote),
)];
collector.pod_info_arc = Arc::new(Mutex::new(pod_info));
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).local.unwrap(), pod_info_local);
assert_eq!(get_flow(flows[0].clone()).remote.unwrap(), pod_info_remote);
// Scenario 4: For a server flow, if the remote ip IP has a single port registered, remote pod can be resolved.
let pod_info: HashMap<IpAddr, HashMap<i32, PodInfo>> = HashMap::from([
(
pod_local,
HashMap::from([
// register multiple ports under this IP.
// we should still be able to resolve the 1111 one correctly
(1111, pod_info_local.clone()),
(2222, polluter_pod_info.clone()),
(3333, polluter_pod_info.clone()),
]),
),
(
pod_remote,
HashMap::from([
// register a single port under this IP.
// we should still be able to resolve this one correctly in a server flow
// because its the only one that could be on the remote end of flow
(6666, pod_info_remote.clone()),
]),
),
]);
let mut flows: Vec<AggregateResults> = vec![create_agg_results(
1111,
0,
Some(pod_local),
Some(pod_remote),
)];
collector.pod_info_arc = Arc::new(Mutex::new(pod_info));
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).local.unwrap(), pod_info_local);
assert_eq!(get_flow(flows[0].clone()).remote.unwrap(), pod_info_remote);
// Scenario 5: For a client flow, if the local ip IP has multiple ports registered, we cant resolve the local pod
let pod_info: HashMap<IpAddr, HashMap<i32, PodInfo>> = HashMap::from([
(
pod_local,
HashMap::from([
// register multiple ports under this IP.
// Since this is a client flow,
// there is no way for us to know whether 1111 initiated the connection or 2222 or 3333 etc.
// So we shouldnt resolve local pod under this case
(1111, pod_info_local.clone()),
(2222, polluter_pod_info.clone()),
(3333, polluter_pod_info.clone()),
]),
),
(
pod_remote,
HashMap::from([
// register multiple ports under this IP.
// we should still be able to resolve the 6666 one correctly
(6666, pod_info_remote.clone()),
(5555, polluter_pod_info.clone()),
(4444, polluter_pod_info.clone()),
]),
),
]);
collector.pod_info_arc = Arc::new(Mutex::new(pod_info));
let mut flows: Vec<AggregateResults> = vec![create_agg_results(
0,
6666,
Some(pod_local),
Some(pod_remote),
)];
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).local, None);
assert_eq!(get_flow(flows[0].clone()).remote.unwrap(), pod_info_remote);
// Scenario 6: For a server flow, if the remote ip IP has multiple ports registered, we cant resolve the remote pod
let mut flows: Vec<AggregateResults> = vec![create_agg_results(
1111,
0,
Some(pod_local),
Some(pod_remote),
)];
assert_eq!(collector.enrich_flows(&mut flows), 1);
assert_eq!(get_flow(flows[0].clone()).local.unwrap(), pod_info_local);
assert_eq!(get_flow(flows[0].clone()).remote, None);
}