in quic/s2n-quic-qns/src/xdp.rs [203:284]
fn bpf_task(&self, port: u16, rx_fds: Vec<(u32, socket::Fd)>) -> Result<()> {
// load the default BPF program from s2n-quic-xdp
let mut bpf = if self.bpf_trace {
let mut bpf = Ebpf::load(bpf::DEFAULT_PROGRAM_TRACE)?;
if let Err(err) = aya_log::EbpfLogger::init(&mut bpf) {
eprint!("error initializing BPF trace: {err:?}");
}
bpf
} else {
Ebpf::load(bpf::DEFAULT_PROGRAM)?
};
let interface = self.interface.clone();
let xdp_stats = self.xdp_stats;
let xdp_mode = self.xdp_mode.into();
let program: &mut programs::Xdp = bpf
.program_mut(bpf::PROGRAM_NAME)
.expect("missing default program")
.try_into()?;
program.load()?;
// attach the BPF program to the interface
let link_id = program.attach(&interface, xdp_mode)?;
let bpf_task = async move {
// register the port as active
let mut ports: HashMap<_, _, _> = bpf
.map_mut(bpf::PORT_MAP_NAME)
.expect("missing port map")
.try_into()?;
// the BPF program just needs to have a non-zero value for the port
let enabled = 1u8;
// no flags are needed
let flags = 0;
ports.insert(port, enabled, flags)?;
// register all of the RX sockets on each of the queues
let mut xskmap: XskMap<&mut MapData> = bpf
.map_mut(bpf::XSK_MAP_NAME)
.expect("missing socket map")
.try_into()?;
for (queue_id, socket) in &rx_fds {
xskmap.set(*queue_id, socket.as_raw_fd(), 0)?;
}
// print xdp stats every second, if configured
if xdp_stats {
loop {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
for (queue_id, socket) in &rx_fds {
if let Ok(stats) = syscall::statistics(socket) {
println!("stats[{queue_id}]: {stats:?}");
}
}
}
}
// we want this future to go until the end of the program so we can keep the BPF
// program active on the the NIC.
core::future::pending::<()>().await;
// retain the bpf program for the duration of execution
let _ = bpf;
let _ = link_id;
let _ = rx_fds;
Result::<(), crate::Error>::Ok(())
};
tokio::spawn(async move {
if let Err(error) = bpf_task.await {
panic!("BPF ERROR: {error}");
}
});
Ok(())
}