static int receive_worker()

in metron-sensors/fastcapa/src/worker.c [87:159]


static int receive_worker(rx_worker_params* params)
{
    const uint8_t nb_ports = rte_eth_dev_count();
    const unsigned socket_id = rte_socket_id();
    const uint16_t rx_burst_size = params->rx_burst_size;
    const uint16_t queue_id = params->queue_id;
    struct rte_ring *ring = params->output_ring;
    int i, dev_socket_id;
    uint8_t port;
    struct rte_mbuf* pkts[MAX_RX_BURST_SIZE];
    //const int attempts = MAX_RX_BURST_SIZE / rx_burst_size;
    const int attempts = 0;

    LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts);

    // validate each port
    for (port = 0; port < nb_ports; port++) {

        // skip ports that are not enabled
        if ((params->enabled_port_mask & (1 << port)) == 0) {
            continue;
        }

        // check for cross-socket communication
        dev_socket_id = rte_eth_dev_socket_id(port);
        if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) {
            LOG_WARN(USER1, "Warning: Port %u on different socket from worker; performance will suffer\n", port);
        }
    }

    port = 0;
    while (!quit_signal) {

        // skip to the next enabled port
        if ((params->enabled_port_mask & (1 << port)) == 0) {
            if (++port == nb_ports) {
                port = 0;
            }
            continue;
        }

        // receive a 'burst' of packets. if get back the max number requested, then there
        // are likely more packets waiting. immediately go back and grab some.
        i = 0;
        uint16_t nb_in = 0, nb_in_last = 0;
        do {
            nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], rx_burst_size);
            nb_in += nb_in_last;

        } while (++i < attempts && nb_in_last == rx_burst_size);
        params->stats.in += nb_in;

        // add each packet to the ring buffer
        if(likely(nb_in) > 0) {
          const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, nb_in, NULL);
          params->stats.out += nb_out;
          params->stats.drops += (nb_in - nb_out);
        }

        // clean-up the packet buffer
        for (i = 0; i < nb_in; i++) {
          rte_pktmbuf_free(pkts[i]);
        }

        // wrap-around to the first port
        if (++port == nb_ports) {
            port = 0;
        }
    }

    LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u \n", rte_lcore_id(), socket_id, queue_id);
    return 0;
}