threads.c (169 lines of code) (raw):

/* * Copyright (C) 2016, Facebook, Inc. * All rights reserved. * * This source code is licensed under the license found in the LICENSE file in * the root directory of this source tree. */ #include <stdlib.h> #include <stdint.h> #include <inttypes.h> #include <signal.h> #include <pthread.h> #include <unistd.h> #include <sys/socket.h> #include "include/common.h" #include "include/msgbuf-struct.h" #include "include/listener.h" #include "include/worker.h" #include "include/threads.h" struct tctl { int nr_listeners; int nr_workers; struct ncrx_listener *listeners; struct ncrx_worker *workers; }; static void wake_thread(struct ncrx_listener *listener, int worker) { struct ncrx_worker *tgt = &listener->workers[worker]; assert_pthread_mutex_locked(&tgt->queuelock); debug("Waking thread %d\n", worker); pthread_cond_signal(&tgt->cond); } static void push_prequeue_to_worker(struct ncrx_listener *listener, int worker) { struct ncrx_worker *tgt = &listener->workers[worker]; struct ncrx_prequeue *prequeue = &listener->prequeues[worker]; assert_pthread_mutex_locked(&tgt->queuelock); if (tgt->queue_head) tgt->queue_tail->next = prequeue->queue_head; else tgt->queue_head = prequeue->queue_head; tgt->queue_tail = prequeue->queue_tail; prequeue->queue_head = NULL; debug("Listener %d pushed %d pkts to worker %d (backlog: %d)\n", listener->thread_nr, prequeue->count, worker->thread_nr, tgt->nr_queued); tgt->nr_queued += prequeue->count; prequeue->count = 0; } static void enqueue_and_wake_worker(struct ncrx_listener *listener, int worker) { struct ncrx_worker *tgt = &listener->workers[worker]; pthread_mutex_lock(&tgt->queuelock); push_prequeue_to_worker(listener, worker); wake_thread(listener, worker); pthread_mutex_unlock(&tgt->queuelock); } static int prequeue_is_empty(struct ncrx_listener *listener, int worker) { struct ncrx_prequeue *prequeue = &listener->prequeues[worker]; return prequeue->queue_head == NULL; } void enqueue_and_wake_all(struct ncrx_listener *listener) { int i; for (i = 0; i < listener->nr_workers; i++) if (!prequeue_is_empty(listener, i)) enqueue_and_wake_worker(listener, i); } static void stop_and_wait_for_workers(struct tctl *ctl) { int i; uint64_t total_processed = 0, total_hosts = 0; for (i = 0; i < ctl->nr_workers; i++) { pthread_mutex_lock(&ctl->workers[i].queuelock); ctl->workers[i].stop = 1; pthread_cond_signal(&ctl->workers[i].cond); pthread_mutex_unlock(&ctl->workers[i].queuelock); pthread_join(ctl->workers[i].id, NULL); pthread_mutex_destroy(&ctl->workers[i].queuelock); pthread_cond_destroy(&ctl->workers[i].cond); pthread_condattr_destroy(&ctl->workers[i].condattr); total_processed += ctl->workers[i].processed; total_hosts += ctl->workers[i].hosts_seen; log("Exiting worker %d got %" PRIu64 " msgs from %" PRIu64 " hosts\n", i, ctl->workers[i].processed, ctl->workers[i].hosts_seen); } log("Total messages processed by workers: %" PRIu64 " from %" PRIu64 " hosts\n", total_processed, total_hosts); free(ctl->workers); } static void stop_and_wait_for_listeners(struct tctl *ctl) { int i; uint64_t total_processed = 0; for (i = 0; i < ctl->nr_listeners; i++) { ctl->listeners[i].stop = 1; pthread_kill(ctl->listeners[i].id, SIGUSR1); pthread_join(ctl->listeners[i].id, NULL); free(ctl->listeners[i].prequeues); total_processed += ctl->listeners[i].processed; log("Exiting listener %d queued %" PRIu64 " messages\n", i, ctl->listeners[i].processed); } log("Total messages processed by listeners: %" PRIu64 "\n", total_processed); free(ctl->listeners); } static void create_worker_threads(struct tctl *ctl, struct netconsd_params *p) { struct ncrx_worker *cur, *workers; int i, r; workers = calloc(p->nr_workers, sizeof(*workers)); if (!workers) fatal("Couldn't allocate thread structures\n"); for (i = 0; i < p->nr_workers; i++) { cur = &workers[i]; pthread_mutex_init(&cur->queuelock, NULL); pthread_condattr_init(&cur->condattr); pthread_condattr_setclock(&cur->condattr, CLOCK_MONOTONIC); pthread_cond_init(&cur->cond, &cur->condattr); cur->queue_head = NULL; cur->thread_nr = i; cur->gc_int_ms = p->gc_int_ms; cur->gc_age_ms = p->gc_age_ms; cur->lastgc = p->gc_int_ms ? now_mono_ms() / p->gc_int_ms : 0; r = pthread_create(&cur->id, NULL, ncrx_worker_thread, cur); if (r) fatal("%d/%d failed: -%d\n", i, p->nr_workers, r); } ctl->nr_workers = p->nr_workers; ctl->workers = workers; } static void create_listener_threads(struct tctl *ctl, struct netconsd_params *p) { struct ncrx_prequeue *prequeues; struct ncrx_listener *cur, *listeners; int i, r; listeners = calloc(p->nr_listeners, sizeof(*listeners)); if (!listeners) fatal("Couldn't allocate listeners: %m\n"); for (i = 0; i < p->nr_listeners; i++) { cur = &listeners[i]; prequeues = calloc(ctl->nr_workers, sizeof(*prequeues)); if (!prequeues) fatal("ENOMEM %d/%d\n", i, p->nr_listeners); cur->thread_nr = i; cur->prequeues = prequeues; cur->workers = ctl->workers; cur->nr_workers = ctl->nr_workers; cur->batch = p->mmsg_batch; cur->address = &p->listen_addr; r = pthread_create(&cur->id, NULL, udp_listener_thread, cur); if (r) fatal("%d/%d failed: -%d\n", i, p->nr_listeners, r); } ctl->nr_listeners = p->nr_listeners; ctl->listeners = listeners; } void destroy_threads(struct tctl *ctl) { stop_and_wait_for_listeners(ctl); stop_and_wait_for_workers(ctl); free(ctl); } struct tctl *create_threads(struct netconsd_params *p) { struct tctl *ret; ret = calloc(1, sizeof(*ret)); if (!ret) fatal("Couldn't allocate thread structures\n"); ret->nr_workers = p->nr_workers; create_worker_threads(ret, p); create_listener_threads(ret, p); return ret; }