fn bpf_task()

in quic/s2n-quic-qns/src/xdp.rs [203:284]


    fn bpf_task(&self, port: u16, rx_fds: Vec<(u32, socket::Fd)>) -> Result<()> {
        // load the default BPF program from s2n-quic-xdp
        let mut bpf = if self.bpf_trace {
            let mut bpf = Ebpf::load(bpf::DEFAULT_PROGRAM_TRACE)?;

            if let Err(err) = aya_log::EbpfLogger::init(&mut bpf) {
                eprint!("error initializing BPF trace: {err:?}");
            }

            bpf
        } else {
            Ebpf::load(bpf::DEFAULT_PROGRAM)?
        };

        let interface = self.interface.clone();
        let xdp_stats = self.xdp_stats;
        let xdp_mode = self.xdp_mode.into();

        let program: &mut programs::Xdp = bpf
            .program_mut(bpf::PROGRAM_NAME)
            .expect("missing default program")
            .try_into()?;
        program.load()?;

        // attach the BPF program to the interface
        let link_id = program.attach(&interface, xdp_mode)?;

        let bpf_task = async move {
            // register the port as active
            let mut ports: HashMap<_, _, _> = bpf
                .map_mut(bpf::PORT_MAP_NAME)
                .expect("missing port map")
                .try_into()?;

            // the BPF program just needs to have a non-zero value for the port
            let enabled = 1u8;
            // no flags are needed
            let flags = 0;
            ports.insert(port, enabled, flags)?;

            // register all of the RX sockets on each of the queues
            let mut xskmap: XskMap<&mut MapData> = bpf
                .map_mut(bpf::XSK_MAP_NAME)
                .expect("missing socket map")
                .try_into()?;

            for (queue_id, socket) in &rx_fds {
                xskmap.set(*queue_id, socket.as_raw_fd(), 0)?;
            }

            // print xdp stats every second, if configured
            if xdp_stats {
                loop {
                    tokio::time::sleep(core::time::Duration::from_secs(1)).await;
                    for (queue_id, socket) in &rx_fds {
                        if let Ok(stats) = syscall::statistics(socket) {
                            println!("stats[{queue_id}]: {stats:?}");
                        }
                    }
                }
            }

            // we want this future to go until the end of the program so we can keep the BPF
            // program active on the the NIC.
            core::future::pending::<()>().await;

            // retain the bpf program for the duration of execution
            let _ = bpf;
            let _ = link_id;
            let _ = rx_fds;

            Result::<(), crate::Error>::Ok(())
        };

        tokio::spawn(async move {
            if let Err(error) = bpf_task.await {
                panic!("BPF ERROR: {error}");
            }
        });

        Ok(())
    }