static void consume_msgbuf()

in worker.c [479:502]


static void consume_msgbuf(struct ncrx_worker *cur, struct msg_buf *buf)
{
	struct bucket *ncrx_bucket;

	ncrx_bucket = hlookup(cur->ht, &buf->src.sin6_addr);
	if (!ncrx_bucket->ncrx) {
		ncrx_bucket->ncrx = ncrx_create(&ncrx_param);
		timerlist_init(&ncrx_bucket->timernode);
		memcpy(&ncrx_bucket->src, &buf->src.sin6_addr,
				sizeof(ncrx_bucket->src));
		cur->ht->load++;
	}

	ncrx_bucket->last_seen = buf->rcv_time;

	buf->buf[buf->rcv_bytes] = '\0';
	if (!ncrx_process(buf->buf, now_mono_ms(), buf->rcv_time,
			ncrx_bucket->ncrx)) {
		drain_bucket_ncrx(cur, ncrx_bucket);
		return;
	}

	execute_output_pipeline(cur->thread_nr, &ncrx_bucket->src, buf, NULL);
}