in worker.c [479:502]
static void consume_msgbuf(struct ncrx_worker *cur, struct msg_buf *buf)
{
struct bucket *ncrx_bucket;
ncrx_bucket = hlookup(cur->ht, &buf->src.sin6_addr);
if (!ncrx_bucket->ncrx) {
ncrx_bucket->ncrx = ncrx_create(&ncrx_param);
timerlist_init(&ncrx_bucket->timernode);
memcpy(&ncrx_bucket->src, &buf->src.sin6_addr,
sizeof(ncrx_bucket->src));
cur->ht->load++;
}
ncrx_bucket->last_seen = buf->rcv_time;
buf->buf[buf->rcv_bytes] = '\0';
if (!ncrx_process(buf->buf, now_mono_ms(), buf->rcv_time,
ncrx_bucket->ncrx)) {
drain_bucket_ncrx(cur, ncrx_bucket);
return;
}
execute_output_pipeline(cur->thread_nr, &ncrx_bucket->src, buf, NULL);
}