worker.c (375 lines of code) (raw):
/*
* Copyright (C) 2016, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the license found in the LICENSE file in
* the root directory of this source tree.
*/
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <pthread.h>
#include <string.h>
#include <limits.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <ncrx.h>
#include "include/common.h"
#include "include/msgbuf-struct.h"
#include "include/output.h"
#include "include/worker.h"
static const struct ncrx_param ncrx_param = {
.nr_slots = 512,
.retx_intv = NETCONS_RTO,
.msg_timeout = NETCONS_RTO,
.oos_timeout = NETCONS_RTO,
};
/*
* Keep it simple: just use a boring probing hashtable that resizes.
*/
struct timerlist {
struct timerlist *prev;
struct timerlist *next;
uint64_t when;
};
struct bucket {
struct in6_addr src;
struct ncrx *ncrx;
uint64_t last_seen;
struct timerlist timernode;
};
struct hashtable {
unsigned long order;
unsigned long load;
struct bucket table[];
};
static unsigned long hash_srcaddr(struct in6_addr *addr)
{
uint32_t *addrptr = (uint32_t *)addr;
return jhash2(addrptr, sizeof(*addr) / sizeof(*addrptr), WORKER_SEED);
}
static unsigned long order_mask(int order)
{
return (1UL << order) - 1;
}
static unsigned long htable_mask(unsigned long hash, int order)
{
return hash & order_mask(order);
}
static unsigned long htable_hash(struct hashtable *h, struct in6_addr *s)
{
return htable_mask(hash_srcaddr(s), h->order);
}
static int srcaddr_compar(struct in6_addr *a, struct in6_addr *b)
{
return memcmp(a, b, sizeof(*a));
}
static struct bucket *hlookup(struct hashtable *h, struct in6_addr *src)
{
unsigned long origidx, idx;
origidx = htable_hash(h, src);
idx = origidx;
while (h->table[idx].ncrx && srcaddr_compar(&h->table[idx].src, src)) {
idx = htable_mask(idx + 1, h->order);
fatal_on(idx == origidx, "Worker hashtable is full\n");
}
return &h->table[idx];
}
/*
* Use -1 to represent "no wake needed"
*/
static void reset_waketime(struct ncrx_worker *cur)
{
cur->wake.tv_sec = -1;
}
static uint64_t ms_from_timespec(struct timespec *t)
{
return t->tv_sec * 1000LL + t->tv_nsec / 1000000L;
}
/*
* Update the waketime if @when is before the current waketime.
*
* We assume that CLOCK_MONOTONIC cannot wrap: strictly speaking this is wrong,
* since POSIX allows the MONOTONIC clock to start from any arbitrary value; but
* since it starts from zero on Linux I'm not going to jump through the hoops.
*/
static void maybe_update_wake(struct ncrx_worker *cur, uint64_t when)
{
uint64_t curwake = ms_from_timespec(&cur->wake);
if ((int64_t)curwake >= 0LL && curwake <= when)
return;
cur->wake.tv_sec = when / 1000LL;
cur->wake.tv_nsec = (when % 1000LL) * 1000000L;
}
static const struct timespec end_of_time = {
.tv_sec = (time_t)((1ULL << ((sizeof(time_t) << 3) - 1)) - 1),
};
static const struct timespec *next_waketime(struct ncrx_worker *cur)
{
if (cur->wake.tv_sec == -1)
return &end_of_time;
return &cur->wake;
}
static struct bucket *bucket_from_timernode(struct timerlist *node)
{
return container_of(node, struct bucket, timernode);
}
static void timerlist_init(struct timerlist *node)
{
node->next = node;
node->prev = node;
node->when = 0;
}
static int timerlist_empty(struct timerlist *node)
{
return node->next == node;
}
static void timerlist_append(struct timerlist *node, struct timerlist *list)
{
struct timerlist *prev = list->prev;
fatal_on(!timerlist_empty(node), "Queueing node already on list\n");
node->next = list;
node->prev = prev;
prev->next = node;
list->prev = node;
}
static void timerlist_del(struct timerlist *node)
{
struct timerlist *prev = node->prev;
struct timerlist *next = node->next;
prev->next = next;
next->prev = prev;
timerlist_init(node);
}
/*
* Return the callback time of the newest item on the list
*/
static uint64_t timerlist_peek(struct timerlist *list)
{
if (timerlist_empty(list))
return 0;
return list->prev->when;
}
#define timerlist_for_each(this, n, thead) \
for (this = (thead)->next, n = this->next; this != (thead); \
this = n, n = this->next)
static struct timerlist *create_timerlists(void)
{
struct timerlist *ret;
int i;
ret = calloc(NETCONS_RTO, sizeof(*ret));
if (!ret)
fatal("Unable to allocate timerlist\n");
for (i = 0; i < NETCONS_RTO; i++)
timerlist_init(&ret[i]);
return ret;
}
static void destroy_timerlists(struct timerlist *timerlist)
{
free(timerlist);
}
static struct hashtable *create_hashtable(int order, struct hashtable *old)
{
struct hashtable *new;
struct bucket *bkt;
unsigned long i;
new = zalloc(sizeof(*new) + sizeof(struct bucket) * (1UL << order));
if (!new)
fatal("Unable to allocate hashtable\n");
new->order = order;
if (!old)
return new;
for (i = 0; i < (1UL << old->order); i++) {
if (old->table[i].ncrx) {
bkt = hlookup(new, &old->table[i].src);
memcpy(bkt, &old->table[i], sizeof(*bkt));
/*
* If the timernode wasn't on a list, initialize it as
* empty for the new bucket. If it was, update its
* neighbors to point to the new bucket.
*/
if (bkt->timernode.next == &old->table[i].timernode) {
timerlist_init(&bkt->timernode);
} else {
bkt->timernode.next->prev = &bkt->timernode;
bkt->timernode.prev->next = &bkt->timernode;
}
}
}
new->load = old->load;
free(old);
return new;
}
static void destroy_hashtable(struct hashtable *ht)
{
unsigned long i;
for (i = 0; i < (1UL << ht->order); i++)
if (ht->table[i].ncrx)
ncrx_destroy(ht->table[i].ncrx);
free(ht);
}
static void maybe_resize_hashtable(struct ncrx_worker *cur, unsigned long new)
{
unsigned long neworder;
if ((cur->ht->load + new) >> (cur->ht->order - 2) < 3)
return;
/*
* The hashtable is more than 75% full. Resize it such that it can take
* @new additional client hosts and be less than 50% full.
*/
neworder = LONG_BIT - __builtin_clzl(cur->ht->load + new) + 1;
cur->ht = create_hashtable(neworder, cur->ht);
}
static void hdelete(struct hashtable *h, struct bucket *victim)
{
struct bucket *old, *new;
unsigned long origidx, idx;
fatal_on(!victim->ncrx, "Attempt to delete free bucket\n");
if (!timerlist_empty(&victim->timernode))
timerlist_del(&victim->timernode);
h->load--;
ncrx_destroy(victim->ncrx);
memset(victim, 0, sizeof(*victim));
/*
* There's potential to be clever here, but for now just be pedantic and
* rebucket any potentially probed entries.
*/
origidx = victim - h->table;
idx = origidx;
while (h->table[idx].ncrx) {
old = &h->table[idx];
new = hlookup(h, &old->src);
if (new != old) {
memcpy(new, old, sizeof(*new));
memset(old, 0, sizeof(*old));
/*
* If the timernode wasn't on a list, initialize it as
* empty for the new bucket. If it was, update its
* neighbors to point to the new bucket.
*/
if (new->timernode.next == &old->timernode) {
timerlist_init(&new->timernode);
} else {
new->timernode.next->prev = &new->timernode;
new->timernode.prev->next = &new->timernode;
}
}
idx = htable_mask(idx + 1, h->order);
fatal_on(idx == origidx, "Infinite loop in hdelete()\n");
}
}
/*
* Simple garbage collection. This is meant to be rare (on the order of once per
* hour), so maintaining an LRU list isn't worth the overhead: just blow through
* the whole table. Worst case it's ~50MB.
*/
static void try_to_garbage_collect(struct ncrx_worker *cur)
{
unsigned long i, count = 0;
uint64_t now, end;
struct bucket *bkt;
now = now_mono_ms();
for (i = 0; i < (1UL << cur->ht->order); i++) {
bkt = &cur->ht->table[i];
if (bkt->ncrx && now - bkt->last_seen > cur->gc_age_ms) {
hdelete(cur->ht, bkt);
count++;
}
}
end = now_mono_ms();
log("Worker %d GC'd %lu in %" PRIu64 "ms\n", cur->thread_nr, count,
end - now);
}
static void maybe_garbage_collect(struct ncrx_worker *cur)
{
uint64_t nowgc;
if (!cur->gc_int_ms)
return;
nowgc = now_mono_ms() / cur->gc_int_ms;
if (nowgc > cur->lastgc) {
try_to_garbage_collect(cur);
cur->lastgc = nowgc;
}
}
static void schedule_ncrx_callback(struct ncrx_worker *cur, struct bucket *bkt,
uint64_t when)
{
struct timerlist *tgtlist;
uint64_t now;
if (when == UINT64_MAX) {
/*
* No callback needed. If we had one we no longer need it, so
* just remove ourselves from the timerlist.
*/
if (!timerlist_empty(&bkt->timernode))
timerlist_del(&bkt->timernode);
return;
}
/*
* Never queue messages outside the current window. This clamp() is what
* guarantees that the callbacks in the timerlists are strictly ordered
* from least to most recent: at any given moment only one callback time
* corresponds to each bucket, and time cannot go backwards.
*/
now = now_mono_ms();
when = clamp(when, now + 1, now + NETCONS_RTO);
/*
* If the bucket is already on a timerlist, we only requeue it if the
* callback needs to happen earlier than the one currently queued.
*/
if (!timerlist_empty(&bkt->timernode)) {
if (when > bkt->timernode.when)
return;
timerlist_del(&bkt->timernode);
}
tgtlist = &cur->tlist[when % NETCONS_RTO];
fatal_on(when < timerlist_peek(tgtlist), "Timerlist ordering broken\n");
bkt->timernode.when = when;
timerlist_append(&bkt->timernode, tgtlist);
maybe_update_wake(cur, when);
}
/*
* Read any pending messages out of the bucket, and invoke the output pipeline
* with the extended metadata.
*/
static void drain_bucket_ncrx(struct ncrx_worker *cur, struct bucket *bkt)
{
struct ncrx_msg *out;
uint64_t when;
while ((out = ncrx_next_msg(bkt->ncrx))) {
execute_output_pipeline(cur->thread_nr, &bkt->src, NULL, out);
free(out);
}
when = ncrx_invoke_process_at(bkt->ncrx);
schedule_ncrx_callback(cur, bkt, when);
}
/*
* Execute callbacks for a specific timerlist, until either the list is empty or
* we reach an entry that was queued for a time in the future.
*/
static void do_ncrx_callbacks(struct ncrx_worker *cur, struct timerlist *list)
{
uint64_t now = now_mono_ms();
struct timerlist *tnode, *tmp;
struct bucket *bkt;
timerlist_for_each(tnode, tmp, list) {
if (tnode->when > now)
break;
/*
* Remove the bucket from the list first, since it might end up
* being re-added to another timerlist by drain_bucket_ncrx().
*/
timerlist_del(tnode);
bkt = bucket_from_timernode(tnode);
ncrx_process(NULL, now, 0, bkt->ncrx);
drain_bucket_ncrx(cur, bkt);
}
}
/*
* We have no idea how large the queue we just processed was: it could have
* taken tens of seconds. So we must handle wraparound in the tlist array.
*/
static uint64_t run_ncrx_callbacks(struct ncrx_worker *cur, uint64_t lastrun)
{
uint64_t i, now = now_mono_ms();
if (now == lastrun)
goto out;
fatal_on(now < lastrun, "Time went backwards\n");
/*
* It's possible we wrapped: in that case, we simply iterate over the
* entire wheel and drain each list until we hit a callback after now.
* Otherwise, we only iterate over the buckets that lie on [last,now].
*/
for (i = max(lastrun, now - NETCONS_RTO + 1); i <= now; i++)
do_ncrx_callbacks(cur, &cur->tlist[i % NETCONS_RTO]);
out:
return now;
}
static void consume_msgbuf(struct ncrx_worker *cur, struct msg_buf *buf)
{
struct bucket *ncrx_bucket;
ncrx_bucket = hlookup(cur->ht, &buf->src.sin6_addr);
if (!ncrx_bucket->ncrx) {
ncrx_bucket->ncrx = ncrx_create(&ncrx_param);
timerlist_init(&ncrx_bucket->timernode);
memcpy(&ncrx_bucket->src, &buf->src.sin6_addr,
sizeof(ncrx_bucket->src));
cur->ht->load++;
}
ncrx_bucket->last_seen = buf->rcv_time;
buf->buf[buf->rcv_bytes] = '\0';
if (!ncrx_process(buf->buf, now_mono_ms(), buf->rcv_time,
ncrx_bucket->ncrx)) {
drain_bucket_ncrx(cur, ncrx_bucket);
return;
}
execute_output_pipeline(cur->thread_nr, &ncrx_bucket->src, buf, NULL);
}
static struct msg_buf *grab_prequeue(struct ncrx_worker *cur)
{
struct msg_buf *ret;
assert_pthread_mutex_locked(&cur->queuelock);
ret = cur->queue_head;
cur->queue_head = NULL;
return ret;
}
void *ncrx_worker_thread(void *arg)
{
struct ncrx_worker *cur = arg;
struct msg_buf *curbuf, *tmp;
uint64_t lastrun = now_mono_ms();
int nr_dequeued;
cur->ht = create_hashtable(16, NULL);
cur->tlist = create_timerlists();
reset_waketime(cur);
pthread_mutex_lock(&cur->queuelock);
while (!cur->stop) {
pthread_cond_timedwait(&cur->cond, &cur->queuelock,
next_waketime(cur));
reset_waketime(cur);
morework:
curbuf = grab_prequeue(cur);
nr_dequeued = cur->nr_queued;
cur->nr_queued = 0;
pthread_mutex_unlock(&cur->queuelock);
maybe_resize_hashtable(cur, nr_dequeued);
while ((tmp = curbuf)) {
consume_msgbuf(cur, curbuf);
curbuf = curbuf->next;
free(tmp);
cur->processed++;
}
if (!cur->stop) {
maybe_garbage_collect(cur);
lastrun = run_ncrx_callbacks(cur, lastrun);
}
pthread_mutex_lock(&cur->queuelock);
if (cur->queue_head)
goto morework;
}
assert_pthread_mutex_locked(&cur->queuelock);
fatal_on(cur->queue_head != NULL, "Worker queue not empty at exit\n");
cur->hosts_seen = cur->ht->load;
destroy_timerlists(cur->tlist);
destroy_hashtable(cur->ht);
return NULL;
}