nfm-common/src/sock_ops_handler.rs (981 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::*;
pub type SockOpsResult = Result<(), SockOpsResultCode>;
#[derive(Debug, PartialEq)]
pub enum SockOpsResultCode {
OperationUnknown,
ContextInvalid,
MapInsertionError,
SetCbFlagsError,
RttInvalid,
SampleDiscard,
}
// Convey's the values of user-space control knobs to BPF.
pub struct BpfControlConveyor {
#[cfg(not(feature = "bpf"))]
pub mock_ebpf_maps: MockEbpfMaps,
}
impl BpfControlConveyor {
pub fn should_handle_event(&self, event_cb_id: u32) -> bool {
if self.is_new_sock_event(event_cb_id) {
self.should_handle_new_sock()
} else {
// Note that we only sample on letting newly connected sockets into the front door.
// For sockets we're already tracking we don't want to miss any events.
true
}
}
fn is_new_sock_event(&self, event_cb_id: u32) -> bool {
matches!(
event_cb_id,
BPF_SOCK_OPS_TCP_CONNECT_CB | BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB
)
}
fn should_handle_new_sock(&self) -> bool {
let control_option = bpf_map_get!(self, NFM_CONTROL, &SINGLETON_KEY);
if let Some(control_data) = control_option {
let sampling_interval = control_data.sampling_interval;
sampling_interval <= 1 || bpf_get_rand_u32!(self) % sampling_interval == 0
} else {
true
}
}
}
pub struct TcpSockOpsHandler<'a> {
ctx: &'a SockOpsContext,
now_us: u64,
counters: EventCounters,
composite_key: CpuSockKey,
#[cfg(not(feature = "bpf"))]
pub mock_ebpf_maps: Option<&'a mut MockEbpfMaps>,
}
impl<'a> TcpSockOpsHandler<'a> {
pub fn new(ctx: &'a SockOpsContext, now_us: u64) -> Self {
TcpSockOpsHandler {
ctx,
now_us,
counters: Default::default(),
composite_key: Default::default(),
#[cfg(not(feature = "bpf"))]
mock_ebpf_maps: None,
}
}
pub fn handle_socket_event(&mut self) -> SockOpsResult {
self.counters.socket_events += 1;
self.composite_key.sock_key = nfm_get_sock_cookie(self.ctx);
self.composite_key.cpu_id = nfm_get_cpu_id();
let result = match self.ctx.op() {
BPF_SOCK_OPS_TCP_CONNECT_CB => self.handle_connect(),
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB => self.handle_passive_established(),
BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB => Ok(()), // No-op.
BPF_SOCK_OPS_STATE_CB => self.handle_state_change(),
BPF_SOCK_OPS_RTT_CB => self.handle_rtt(),
BPF_SOCK_OPS_RETRANS_CB => self.handle_retransmit(),
BPF_SOCK_OPS_RTO_CB => self.handle_rto(),
BPF_SOCK_OPS_PARSE_HDR_OPT_CB => self.handle_bytes_transferred_event(),
BPF_SOCK_OPS_HDR_OPT_LEN_CB => self.handle_bytes_transferred_event(),
_ => {
self.counters.other_events += 1;
Err(SockOpsResultCode::OperationUnknown)
}
};
// Persist our new counter contributions.
unsafe {
match bpf_map_get_ptr_mut!(self, NFM_COUNTERS, &SINGLETON_KEY) {
Some(persisted_counters) => {
(*persisted_counters).add_from(&self.counters);
}
None => {
let _ = bpf_map_insert!(
self,
NFM_COUNTERS,
&SINGLETON_KEY,
&self.counters,
BPF_ANY
);
}
};
}
result
}
fn handle_connect(&mut self) -> SockOpsResult {
self.counters.active_connect_events += 1;
const IS_CLIENT: bool = true;
self.handle_new_sock(IS_CLIENT)
}
fn handle_passive_established(&mut self) -> SockOpsResult {
self.counters.passive_established_events += 1;
const IS_CLIENT: bool = false;
self.handle_new_sock(IS_CLIENT)
}
fn handle_new_sock(&mut self, is_client: bool) -> SockOpsResult {
let sock_context = SockContext::from_sock_ops(self.ctx, is_client);
if !sock_context.is_valid() {
self.counters.sockets_invalid += 1;
return Err(SockOpsResultCode::ContextInvalid);
}
let res = bpf_map_insert!(
self,
NFM_SK_PROPS,
&self.composite_key,
&sock_context,
BPF_NOEXIST
);
if res.is_err() {
self.counters.map_insertion_errors += 1;
return Err(SockOpsResultCode::MapInsertionError);
}
// Register to receive events only for successfully recorded sockets, meaning after success
// of the above map insertion.
self.configure_flags()?;
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
sock_stats.connect_start_us = self.now_us;
if is_client {
sock_stats.connect_attempts += 1;
}
if nfm_get_sock_state(self.ctx) == BPF_TCP_ESTABLISHED {
sock_stats
.state_flags
.insert(SockStateFlags::ENTERED_ESTABLISH);
}
Ok(())
},
Err(e) => Err(e),
}
}
fn handle_state_change(&mut self) -> SockOpsResult {
self.counters.state_change_events += 1;
// This callback is called before the state is actually changed, so the arguments need to
// be used instead of the ctx.state.
let old_state = self.ctx.arg(0);
let new_state = self.ctx.arg(1);
let mut new_flags = SockStateFlags::empty();
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
if new_state == BPF_TCP_ESTABLISHED {
new_flags |= SockStateFlags::ENTERED_ESTABLISH;
sock_stats.connect_end_us = self.now_us;
if old_state == BPF_TCP_SYN_SENT {
self.counters.active_established_events += 1;
sock_stats.connect_successes += 1;
}
} else if !matches!(new_state, BPF_TCP_SYN_SENT | BPF_TCP_SYN_RECV) {
new_flags |= SockStateFlags::STARTED_CLOSURE;
if matches!(old_state, BPF_TCP_SYN_SENT | BPF_TCP_SYN_RECV) {
new_flags |= SockStateFlags::TERMINATED_FROM_SYN;
}
if new_state == BPF_TCP_CLOSE {
new_flags |= SockStateFlags::CLOSED;
// Store some final stats on connection close.
let event_stats = nfm_get_sock_ops_stats(self.ctx);
sock_stats.bytes_received = event_stats.bytes_received;
sock_stats.bytes_delivered = event_stats.bytes_acked;
sock_stats.segments_received = event_stats.segments_received;
sock_stats.segments_delivered = event_stats.segments_delivered;
if old_state == BPF_TCP_ESTABLISHED {
new_flags |= SockStateFlags::TERMINATED_FROM_EST;
}
}
}
sock_stats.state_flags.insert(new_flags);
Ok(())
},
Err(e) => Err(e),
}
}
fn handle_rtt(&mut self) -> SockOpsResult {
self.counters.rtt_events += 1;
// A recent Linux enhancement [a,b] supplies the most recently measured and smoothed RTT
// values as arguments to this BPF callback. If we do not see those, we fallback to a less
// timely value on the sock-ops struct and increment a counter.
//
// [a] https://git.kernel.org/pub/scm/linux/kernel/git/bpf/bpf-next.git/commit/?id=48e2cd3e3dcf
// [b] https://code.amazon.com/reviews/CR-144526532/
let event_stats = nfm_get_sock_ops_stats(self.ctx);
let rtt_measured_us = match self.ctx.arg(0) {
0 => {
self.counters.rtts_invalid += 1;
event_stats.srtt_us
}
x => x,
};
let rtt_smoothed_us = match self.ctx.arg(1) {
0 => event_stats.srtt_us,
x => x >> 3,
};
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
sock_stats.rtt_count += 1;
sock_stats.rtt_latest_us = rtt_measured_us;
sock_stats.rtt_smoothed_us = rtt_smoothed_us;
// Take this opporunity to update bytes transferred as well.
sock_stats.bytes_received = event_stats.bytes_received;
sock_stats.bytes_delivered = event_stats.bytes_acked;
sock_stats.segments_received = event_stats.segments_received;
sock_stats.segments_delivered = event_stats.segments_delivered;
Ok(())
},
Err(e) => Err(e),
}
}
fn handle_retransmit(&mut self) -> SockOpsResult {
self.counters.retrans_events += 1;
// Args are: [seq-num, num-segs, tx-errno].
let retrans_segments = self.ctx.arg(1);
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
match nfm_get_sock_state(self.ctx) {
BPF_TCP_ESTABLISHED => sock_stats.retrans_est += retrans_segments,
BPF_TCP_SYN_SENT => {
sock_stats.retrans_syn += retrans_segments;
sock_stats.connect_attempts += retrans_segments as u8;
}
BPF_TCP_SYN_RECV => sock_stats.retrans_syn += retrans_segments,
_ => sock_stats.retrans_close += retrans_segments,
};
Ok(())
},
Err(e) => Err(e),
}
}
fn handle_rto(&mut self) -> SockOpsResult {
self.counters.rto_events += 1;
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
match nfm_get_sock_state(self.ctx) {
BPF_TCP_ESTABLISHED => sock_stats.rtos_est += 1,
BPF_TCP_SYN_SENT | BPF_TCP_SYN_RECV => sock_stats.rtos_syn += 1,
_ => sock_stats.rtos_close += 1,
};
Ok(())
},
Err(e) => Err(e),
}
}
fn handle_bytes_transferred_event(&mut self) -> SockOpsResult {
match self.get_or_add_sock_stats() {
Ok(stats_raw) => unsafe {
// Update running stats.
let sock_stats = &mut *stats_raw;
let event_stats = nfm_get_sock_ops_stats(self.ctx);
sock_stats.bytes_received = event_stats.bytes_received;
sock_stats.bytes_delivered = event_stats.bytes_acked;
sock_stats.segments_received = event_stats.segments_received;
sock_stats.segments_delivered = event_stats.segments_delivered;
Ok(())
},
Err(e) => Err(e),
}
}
fn get_or_add_sock_stats(&mut self) -> Result<*mut SockStats, SockOpsResultCode> {
match bpf_map_get_ptr_mut!(self, NFM_SK_STATS, &self.composite_key) {
Some(stats_raw) => unsafe {
let sock_stats = &mut *stats_raw;
sock_stats.last_touched_us = self.now_us;
Ok(sock_stats)
},
None => {
let new_stats = SockStats {
last_touched_us: self.now_us,
..Default::default()
};
match bpf_map_insert!(
self,
NFM_SK_STATS,
&self.composite_key,
&new_stats,
BPF_NOEXIST
) {
Ok(_) => match bpf_map_get_ptr_mut!(self, NFM_SK_STATS, &self.composite_key) {
Some(stats) => Ok(stats),
None => {
self.counters.map_insertion_errors += 1;
Err(SockOpsResultCode::MapInsertionError)
}
},
Err(_) => {
self.counters.map_insertion_errors += 1;
Err(SockOpsResultCode::MapInsertionError)
}
}
}
}
}
fn configure_flags(&mut self) -> SockOpsResult {
// Tell the kernel which events we want to receive for the current socket.
match self.ctx.set_cb_flags(
(BPF_SOCK_OPS_RTT_CB_FLAG
| BPF_SOCK_OPS_RTO_CB_FLAG
| BPF_SOCK_OPS_STATE_CB_FLAG
| BPF_SOCK_OPS_RETRANS_CB_FLAG
| BPF_SOCK_OPS_PARSE_ALL_HDR_OPT_CB_FLAG
// The following provides us with events for non-data egress packets (such as RSTs
// and dup-ACKs). Without this, a socket can be evicted before being marked as
// severed within the flow.
| BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG) as i32,
) {
Ok(_) => Ok(()),
Err(_code) => {
self.counters.set_flags_errors += 1;
Err(SockOpsResultCode::SetCbFlagsError)
}
}
}
}
#[cfg(all(test, not(feature = "bpf")))]
mod test {
use super::*;
const NO_COOKIE: u64 = 0;
fn run_sock_ops_test(
cookie: u64,
op_code: u32,
mock_ebpf_maps: &mut MockEbpfMaps,
ktime_us: u64,
expectation: SockOpsResult,
) {
let no_args: [u32; 2] = [0, 0];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
op_code,
no_args,
mock_ebpf_maps,
ktime_us,
expectation,
);
}
fn run_sock_ops_test_with_args(
cookie: u64,
sock_state: u32,
op_code: u32,
args: [u32; 2],
mock_ebpf_maps: &mut MockEbpfMaps,
ktime_us: u64,
expectation: SockOpsResult,
) {
let ctx = SockOpsContext {
cookie,
sock_state,
op: op_code,
args,
family: AF_INET,
..Default::default()
};
// Handle the event.
let mut handler = TcpSockOpsHandler::new(&ctx, ktime_us);
handler.mock_ebpf_maps = Some(mock_ebpf_maps);
let result = handler.handle_socket_event();
// Validate initial results.
assert_eq!(result, expectation);
if expectation.is_ok() {
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
assert_eq!(
mock_ebpf_maps.sock_stats(&composite_key).last_touched_us,
ktime_us
);
}
}
#[test]
fn test_ebpf_op_invalid() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
const INVALID_OP: u32 = 0;
run_sock_ops_test(
NO_COOKIE,
INVALID_OP,
&mut mock_ebpf_maps,
mock_ktime_us,
Err(SockOpsResultCode::OperationUnknown),
);
}
#[test]
fn test_ebpf_op_valid() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 211;
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
}
#[test]
fn test_ebpf_context_invalid() {
// Prep our test data.
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
const AF_OTHER: u32 = 0;
let ctx = SockOpsContext {
op: BPF_SOCK_OPS_TCP_CONNECT_CB,
family: AF_OTHER,
..Default::default()
};
// Try to handle the event.
let mut handler = TcpSockOpsHandler::new(&ctx, mock_ktime_us);
handler.mock_ebpf_maps = Some(&mut mock_ebpf_maps);
let result = handler.handle_socket_event();
// Confirm failure.
assert_eq!(result, Err(SockOpsResultCode::ContextInvalid));
assert_eq!(mock_ebpf_maps.counters().active_connect_events, 1);
assert_eq!(mock_ebpf_maps.counters().sockets_invalid, 1);
assert!(mock_ebpf_maps.NFM_SK_PROPS.data.is_empty());
assert!(mock_ebpf_maps.NFM_SK_STATS.data.is_empty());
}
#[test]
fn test_ebpf_sock_op_connect() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
assert_eq!(mock_ebpf_maps.counters().active_connect_events, 1);
assert_eq!(mock_ebpf_maps.NFM_SK_PROPS.data.len(), 1);
assert_eq!(mock_ebpf_maps.NFM_SK_STATS.data.len(), 1);
}
#[test]
fn test_ebpf_sock_op_passive_established() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
assert_eq!(mock_ebpf_maps.counters().passive_established_events, 1);
assert_eq!(mock_ebpf_maps.NFM_SK_PROPS.data.len(), 1);
assert_eq!(mock_ebpf_maps.NFM_SK_STATS.data.len(), 1);
}
#[test]
fn test_ebpf_sock_op_active_established_sans_connect() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
let ctx = SockOpsContext {
op: BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB,
family: AF_INET,
cookie,
..Default::default()
};
// Handle the event.
let mut handler = TcpSockOpsHandler::new(&ctx, mock_ktime_us);
handler.mock_ebpf_maps = Some(&mut mock_ebpf_maps);
let result = handler.handle_socket_event();
// Validate results.
assert_eq!(result, Ok(()));
assert_eq!(mock_ebpf_maps.counters().active_established_events, 0);
assert!(mock_ebpf_maps.NFM_SK_PROPS.data.is_empty());
assert!(mock_ebpf_maps.NFM_SK_STATS.data.is_empty());
}
#[test]
fn test_ebpf_sock_op_active_established() {
let mock_ktime_us: u64 = 99;
let cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
// Handle two events.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
[BPF_TCP_SYN_SENT, BPF_TCP_ESTABLISHED],
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
// Validate results.
assert_eq!(mock_ebpf_maps.NFM_SK_PROPS.data.len(), 1);
assert_eq!(mock_ebpf_maps.counters().active_connect_events, 1);
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let _ = mock_ebpf_maps.sock_props(&composite_key);
let sock_stats = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(sock_stats.connect_start_us, mock_ktime_us);
assert_eq!(sock_stats.connect_end_us, mock_ktime_us + 10);
}
#[test]
fn test_ebpf_sock_op_rtt_with_arg() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
// Handle a first event.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Handle a second event.
let ctx = SockOpsContext {
cookie,
op: BPF_SOCK_OPS_RTT_CB,
args: [52, 59 << 3],
..Default::default()
};
let mut handler = TcpSockOpsHandler::new(&ctx, mock_ktime_us);
handler.mock_ebpf_maps = Some(&mut mock_ebpf_maps);
let result = handler.handle_socket_event();
// Validate results.
assert_eq!(result, Ok(()));
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_stats = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(sock_stats.rtt_count, 1);
assert_eq!(sock_stats.rtt_latest_us, 52);
assert_eq!(sock_stats.rtt_smoothed_us, 59);
assert_eq!(mock_ebpf_maps.counters().rtt_events, 1);
}
#[test]
fn test_ebpf_sock_op_rtt_no_arg() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
// Handle a first event.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Handle a second event.
let ctx = SockOpsContext {
cookie,
op: BPF_SOCK_OPS_RTT_CB,
args: [0, 0],
stats: SockOpsStats {
srtt_us: 59,
..Default::default()
},
..Default::default()
};
let mut handler = TcpSockOpsHandler::new(&ctx, mock_ktime_us);
handler.mock_ebpf_maps = Some(&mut mock_ebpf_maps);
let result = handler.handle_socket_event();
// Validate results.
assert_eq!(result, Ok(()));
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_stats = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(sock_stats.rtt_count, 1);
assert_eq!(sock_stats.rtt_latest_us, 59);
assert_eq!(sock_stats.rtt_smoothed_us, 59);
assert_eq!(mock_ebpf_maps.counters().rtt_events, 1);
assert_eq!(mock_ebpf_maps.counters().rtts_invalid, 1);
}
#[test]
fn test_ebpf_sock_op_retrans() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
// A connection is initiated, followed by retransmits.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
let mut num_retrans = 11;
let args: [u32; 2] = [0, num_retrans];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_RETRANS_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// After entering established, more retransmits are seen. Notice that the new state is
// reflected in the args, and not the sock context.
let args: [u32; 2] = [BPF_TCP_SYN_SENT, BPF_TCP_ESTABLISHED];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
num_retrans = 22;
let args: [u32; 2] = [0, num_retrans];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_RETRANS_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Finally, during close, more retransmits are seen.
let args: [u32; 2] = [BPF_TCP_ESTABLISHED, BPF_TCP_FIN_WAIT1];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
num_retrans = 33;
let args: [u32; 2] = [0, num_retrans];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_FIN_WAIT1,
BPF_SOCK_OPS_RETRANS_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Validate results.
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_stats = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(sock_stats.retrans_syn, 11);
assert_eq!(sock_stats.retrans_est, 22);
assert_eq!(sock_stats.retrans_close, 33);
assert_eq!(mock_ebpf_maps.counters().retrans_events, 3);
}
#[test]
fn test_ebpf_sock_op_rto() {
let mock_ktime_us: u64 = 0;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let cookie: u64 = 197;
// A connection is initiated, followed by an RTO.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_RTO_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// After entering established, we see another RTO.
let args: [u32; 2] = [BPF_TCP_SYN_SENT, BPF_TCP_ESTABLISHED];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_RTO_CB,
[0, 0],
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Finally, during close, another RTO.
let args: [u32; 2] = [BPF_TCP_ESTABLISHED, BPF_TCP_FIN_WAIT1];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test_with_args(
cookie,
BPF_TCP_FIN_WAIT1,
BPF_SOCK_OPS_RTO_CB,
[0, 0],
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Validate results.
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_stats = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(sock_stats.rtos_syn, 1);
assert_eq!(sock_stats.rtos_est, 1);
assert_eq!(sock_stats.rtos_close, 1);
assert_eq!(mock_ebpf_maps.counters().rto_events, 3);
}
#[test]
fn test_ebpf_sock_op_rst_on_passive_connect() {
let mock_ktime_us: u64 = 99;
let cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
// Receive a connection.
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB,
[0, 0],
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Terminate the connect attempt.
let args: [u32; 2] = [BPF_TCP_ESTABLISHED, BPF_TCP_CLOSE];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
// Validate results.
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_wrap = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(
sock_wrap.state_flags,
SockStateFlags::ENTERED_ESTABLISH
| SockStateFlags::STARTED_CLOSURE
| SockStateFlags::TERMINATED_FROM_EST
| SockStateFlags::CLOSED
);
assert_eq!(mock_ebpf_maps.counters().state_change_events, 1);
}
#[test]
fn test_ebpf_sock_op_rst_on_active_connect() {
let mock_ktime_us: u64 = 99;
let cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
// Initiate a connection.
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_TCP_CONNECT_CB,
[0, 0],
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
// Terminate the connect attempt.
let args: [u32; 2] = [BPF_TCP_SYN_SENT, BPF_TCP_CLOSE];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
// Validate results.
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_wrap = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(
sock_wrap.state_flags,
SockStateFlags::STARTED_CLOSURE
| SockStateFlags::TERMINATED_FROM_SYN
| SockStateFlags::CLOSED
);
assert_eq!(mock_ebpf_maps.counters().state_change_events, 1);
}
#[test]
fn test_ebpf_sock_op_rst_on_establish() {
let mock_ktime_us: u64 = 99;
let cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
// Handle two events.
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
[BPF_TCP_SYN_SENT, BPF_TCP_ESTABLISHED],
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
// Handle a 3rd event that mimicks a TCP RST by transitioning from ESTABLISHED to CLOSED.
let args: [u32; 2] = [BPF_TCP_ESTABLISHED, BPF_TCP_CLOSE];
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
args,
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
// Validate results.
let composite_key = CpuSockKey {
sock_key: cookie,
cpu_id: MOCK_CPU_ID,
};
let sock_wrap = mock_ebpf_maps.sock_stats(&composite_key);
assert_eq!(
sock_wrap.state_flags,
SockStateFlags::ENTERED_ESTABLISH
| SockStateFlags::STARTED_CLOSURE
| SockStateFlags::TERMINATED_FROM_EST
| SockStateFlags::CLOSED
);
assert_eq!(mock_ebpf_maps.counters().state_change_events, 2);
}
#[test]
fn test_ebpf_sock_op_max_connections() {
let mut mock_ktime_us: u64 = 99;
let mut cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
let effective_max = MAX_ENTRIES_SK_PROPS_HI.min(MAX_ENTRIES_SK_STATS_HI);
for _ in 0..effective_max {
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
run_sock_ops_test_with_args(
cookie,
BPF_TCP_SYN_SENT,
BPF_SOCK_OPS_STATE_CB,
[BPF_TCP_SYN_SENT, BPF_TCP_ESTABLISHED],
&mut mock_ebpf_maps,
mock_ktime_us + 10,
Ok(()),
);
cookie += 3;
mock_ktime_us += 10;
}
// Validate results.
assert!(effective_max > 0);
assert_eq!(
mock_ebpf_maps.counters().active_connect_events,
effective_max.try_into().unwrap(),
);
assert_eq!(
mock_ebpf_maps.NFM_SK_PROPS.data.len(),
effective_max.try_into().unwrap()
);
assert_eq!(
mock_ebpf_maps.NFM_SK_STATS.data.len(),
effective_max.try_into().unwrap()
);
}
#[test]
fn test_ebpf_sock_op_too_many_connections() {
let mut mock_ktime_us: u64 = 99;
let mut cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
for _ in 0..MAX_ENTRIES_SK_PROPS_HI {
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
cookie += 3;
mock_ktime_us += 10;
}
for _ in 0..10 {
run_sock_ops_test(
cookie,
BPF_SOCK_OPS_TCP_CONNECT_CB,
&mut mock_ebpf_maps,
mock_ktime_us,
Err(SockOpsResultCode::MapInsertionError),
);
cookie += 3;
mock_ktime_us += 10;
}
// Validate results.
assert!(MAX_ENTRIES_SK_PROPS_HI > 0);
assert_eq!(
mock_ebpf_maps.NFM_SK_PROPS.data.len(),
MAX_ENTRIES_SK_PROPS_HI.try_into().unwrap()
);
assert_eq!(
mock_ebpf_maps.counters().active_connect_events,
(MAX_ENTRIES_SK_PROPS_HI + 10).try_into().unwrap()
);
assert_eq!(mock_ebpf_maps.counters().map_insertion_errors, 10);
}
#[test]
fn test_ebpf_sock_op_too_many_events() {
let mut mock_ktime_us: u64 = 99;
let mut cookie: u64 = 197;
let mut mock_ebpf_maps = MockEbpfMaps::new();
for _ in 0..MAX_ENTRIES_SK_STATS_HI {
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
[BPF_TCP_ESTABLISHED, BPF_TCP_CLOSE],
&mut mock_ebpf_maps,
mock_ktime_us,
Ok(()),
);
cookie += 3;
mock_ktime_us += 10;
}
for _ in 0..10 {
run_sock_ops_test_with_args(
cookie,
BPF_TCP_ESTABLISHED,
BPF_SOCK_OPS_STATE_CB,
[BPF_TCP_ESTABLISHED, BPF_TCP_CLOSE],
&mut mock_ebpf_maps,
mock_ktime_us,
Err(SockOpsResultCode::MapInsertionError),
);
cookie += 3;
mock_ktime_us += 10;
}
// Validate results.
assert!(MAX_ENTRIES_SK_STATS_HI > 0);
assert_eq!(
mock_ebpf_maps.NFM_SK_STATS.data.len(),
MAX_ENTRIES_SK_STATS_HI.try_into().unwrap()
);
assert_eq!(
mock_ebpf_maps.counters().state_change_events,
(MAX_ENTRIES_SK_STATS_HI + 10).try_into().unwrap()
);
assert_eq!(mock_ebpf_maps.counters().map_insertion_errors, 10);
}
#[test]
fn test_ebpf_sock_op_sample_discard() {
// A random value not divisible by our sampling interval results in a discard.
let control_data = ControlData {
sampling_interval: 2,
..Default::default()
};
let mut mock_ebpf_maps = MockEbpfMaps::new();
mock_ebpf_maps.mock_rand = 121;
mock_ebpf_maps
.NFM_CONTROL
.insert(&SINGLETON_KEY, &control_data, BPF_ANY)
.unwrap();
let conveyor = BpfControlConveyor { mock_ebpf_maps };
// We discard new socket events.
assert!(!conveyor.should_handle_event(BPF_SOCK_OPS_TCP_CONNECT_CB));
assert!(!conveyor.should_handle_event(BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB));
// We capture other events.
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RETRANS_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RTT_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RTO_CB));
}
#[test]
fn test_ebpf_sock_op_sample_capture() {
// A random value that is divisible by our sampling interval results in a capture.
let control_data = ControlData {
sampling_interval: 2,
..Default::default()
};
let mut mock_ebpf_maps = MockEbpfMaps::new();
mock_ebpf_maps.mock_rand = 122;
mock_ebpf_maps
.NFM_CONTROL
.insert(&SINGLETON_KEY, &control_data, BPF_ANY)
.unwrap();
let conveyor = BpfControlConveyor { mock_ebpf_maps };
// We capture new socket events.
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_TCP_CONNECT_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB));
// We also capture other events.
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RETRANS_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RTT_CB));
assert!(conveyor.should_handle_event(BPF_SOCK_OPS_RTO_CB));
}
}