in worker.c [414:426]
static void drain_bucket_ncrx(struct ncrx_worker *cur, struct bucket *bkt)
{
struct ncrx_msg *out;
uint64_t when;
while ((out = ncrx_next_msg(bkt->ncrx))) {
execute_output_pipeline(cur->thread_nr, &bkt->src, NULL, out);
free(out);
}
when = ncrx_invoke_process_at(bkt->ncrx);
schedule_ncrx_callback(cur, bkt, when);
}