listener.c (129 lines of code) (raw):

/* * Copyright (C) 2016, Facebook, Inc. * All rights reserved. * * This source code is licensed under the license found in the LICENSE file in * file in the root directory of this source tree. */ #include <stdlib.h> #include <stdint.h> #include <inttypes.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include "include/common.h" #include "include/msgbuf-struct.h" #include "include/threads.h" #include "include/listener.h" static void handle_listen_error(int err) { switch(err) { case EINTR: /* * The fact that we got an error return means that recvmmsg() * hadn't actually done anything, so we can just loop back over * the call no problem. */ return; case 0: fatal("Unexpected EOF from recvmmsg()\n"); default: fatal("Unexpected listen error: %m (-%d)\n", errno); } } static struct msg_buf *msgbuf_from_iovec(struct iovec *vecptr) { return container_of(vecptr, struct msg_buf, iovec); } static unsigned long hash_srcaddr(struct in6_addr *addr) { uint32_t *addrptr = (uint32_t *)addr; return jhash2(addrptr, sizeof(*addr) / sizeof(*addrptr), LISTEN_SEED); } static void prequeue_msgbuf(struct ncrx_listener *listener, struct msg_buf *buf) { struct ncrx_prequeue *prequeue; unsigned long hash; hash = hash_srcaddr(&buf->src.sin6_addr); prequeue = &listener->prequeues[hash % listener->nr_workers]; if (prequeue->queue_head) prequeue->queue_tail->next = buf; else prequeue->queue_head = buf; prequeue->queue_tail = buf; prequeue->count++; } static void reinit_mmsghdr_vec(struct mmsghdr *vec, int nr, int rcvbufsz) { struct msg_buf *cur; int i; memset(vec, 0, sizeof(*vec) * nr); for (i = 0; i < nr; i++) { cur = malloc(sizeof(*cur) + rcvbufsz); if (!cur) fatal("-ENOMEM after %d/%d rcvbufs\n", i, nr); memset(cur, 0, sizeof(*cur)); cur->buf[rcvbufsz - 1] = '\0'; cur->iovec.iov_base = &cur->buf; cur->iovec.iov_len = rcvbufsz - 1; vec[i].msg_hdr.msg_iov = &cur->iovec; vec[i].msg_hdr.msg_iovlen = 1; vec[i].msg_hdr.msg_name = &cur->src; vec[i].msg_hdr.msg_namelen = sizeof(cur->src); } } static struct mmsghdr *alloc_mmsghdr_vec(int nr, int rcvbufsz) { struct mmsghdr *mmsgvec; mmsgvec = malloc(sizeof(*mmsgvec) * nr); if (!mmsgvec) fatal("Unable to allocate mmsghdr array\n"); reinit_mmsghdr_vec(mmsgvec, nr, rcvbufsz); return mmsgvec; } static void free_mmsghdr_vec(struct mmsghdr *vec, int nr) { struct msg_buf *cur; int i; for (i = 0; i < nr; i++) { cur = msgbuf_from_iovec(vec[i].msg_hdr.msg_iov); free(cur); } free(vec); } static int get_listen_socket(struct sockaddr_in6 *bindaddr) { int fd, ret, optval = 1; fd = socket(AF_INET6, SOCK_DGRAM, 0); if (fd == -1) fatal("Couldn't get socket: %m\n"); ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); if (ret == -1) fatal("Couldn't set SO_REUSEPORT on socket: %m\n"); ret = bind(fd, bindaddr, sizeof(*bindaddr)); if (ret == -1) fatal("Couldn't bind: %m\n"); return fd; } void *udp_listener_thread(void *arg) { int fd, nr_recv, i; uint64_t now; struct ncrx_listener *us = arg; struct mmsghdr *vec; struct msg_buf *cur; fd = get_listen_socket(us->address); vec = alloc_mmsghdr_vec(us->batch, RCVBUF_SIZE); while (!us->stop) { nr_recv = recvmmsg(fd, vec, us->batch, MSG_WAITFORONE, NULL); if (nr_recv <= 0) { handle_listen_error(errno); continue; } debug("recvmmsg() got %d packets\n", nr_recv); now = now_real_ms(); for (i = 0; i < nr_recv; i++) { cur = msgbuf_from_iovec(vec[i].msg_hdr.msg_iov); cur->rcv_flags = vec[i].msg_hdr.msg_flags; cur->rcv_bytes = vec[i].msg_len; cur->rcv_time = now; prequeue_msgbuf(us, cur); us->processed++; } enqueue_and_wake_all(us); reinit_mmsghdr_vec(vec, nr_recv, RCVBUF_SIZE); } free_mmsghdr_vec(vec, us->batch); return NULL; }