in threads.c [115:135]
static void stop_and_wait_for_listeners(struct tctl *ctl)
{
int i;
uint64_t total_processed = 0;
for (i = 0; i < ctl->nr_listeners; i++) {
ctl->listeners[i].stop = 1;
pthread_kill(ctl->listeners[i].id, SIGUSR1);
pthread_join(ctl->listeners[i].id, NULL);
free(ctl->listeners[i].prequeues);
total_processed += ctl->listeners[i].processed;
log("Exiting listener %d queued %" PRIu64 " messages\n", i,
ctl->listeners[i].processed);
}
log("Total messages processed by listeners: %" PRIu64 "\n",
total_processed);
free(ctl->listeners);
}