in metron-sensors/fastcapa/src/worker.c [87:159]
static int receive_worker(rx_worker_params* params)
{
const uint8_t nb_ports = rte_eth_dev_count();
const unsigned socket_id = rte_socket_id();
const uint16_t rx_burst_size = params->rx_burst_size;
const uint16_t queue_id = params->queue_id;
struct rte_ring *ring = params->output_ring;
int i, dev_socket_id;
uint8_t port;
struct rte_mbuf* pkts[MAX_RX_BURST_SIZE];
//const int attempts = MAX_RX_BURST_SIZE / rx_burst_size;
const int attempts = 0;
LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts);
// validate each port
for (port = 0; port < nb_ports; port++) {
// skip ports that are not enabled
if ((params->enabled_port_mask & (1 << port)) == 0) {
continue;
}
// check for cross-socket communication
dev_socket_id = rte_eth_dev_socket_id(port);
if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) {
LOG_WARN(USER1, "Warning: Port %u on different socket from worker; performance will suffer\n", port);
}
}
port = 0;
while (!quit_signal) {
// skip to the next enabled port
if ((params->enabled_port_mask & (1 << port)) == 0) {
if (++port == nb_ports) {
port = 0;
}
continue;
}
// receive a 'burst' of packets. if get back the max number requested, then there
// are likely more packets waiting. immediately go back and grab some.
i = 0;
uint16_t nb_in = 0, nb_in_last = 0;
do {
nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], rx_burst_size);
nb_in += nb_in_last;
} while (++i < attempts && nb_in_last == rx_burst_size);
params->stats.in += nb_in;
// add each packet to the ring buffer
if(likely(nb_in) > 0) {
const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, nb_in, NULL);
params->stats.out += nb_out;
params->stats.drops += (nb_in - nb_out);
}
// clean-up the packet buffer
for (i = 0; i < nb_in; i++) {
rte_pktmbuf_free(pkts[i]);
}
// wrap-around to the first port
if (++port == nb_ports) {
port = 0;
}
}
LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u \n", rte_lcore_id(), socket_id, queue_id);
return 0;
}