in c/src/proactor/epoll.c [2524:2681]
static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
// As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
assert(!p->resched_cutoff);
assert(!p->sched_ready_first && !p->sched_ready_pending);
int n_events;
task_t *tsk;
bool unpolled_work = false;
while (true) {
assert(p->n_runnables == 0);
if (p->thread_count > p->thread_capacity)
grow_poller_bufs(p);
p->next_runnable = 0;
p->n_warm_runnables = 0;
p->last_earmark = NULL;
bool unfinished_earmarks = p->earmark_count > 0;
if (unfinished_earmarks || p->resched_first)
unpolled_work = true;
bool epoll_immediate = unpolled_work || !can_block;
// Determine if notify_poller() can be avoided.
if (!epoll_immediate) {
lock(&p->eventfd_mutex);
if (p->ready_list_first) {
unpolled_work = true;
epoll_immediate = true;
} else {
// Poller may sleep. Enable eventfd wakeup.
p->ready_list_active = false;
}
unlock(&p->eventfd_mutex);
}
int timeout = (epoll_immediate) ? 0 : -1;
p->poller_suspended = (timeout == -1);
unlock(&p->sched_mutex);
n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
lock(&p->sched_mutex);
p->poller_suspended = false;
if (p->resched_first) {
// Defer future resched tasks until next do_epoll()
p->resched_cutoff = p->resched_last;
assert(p->polled_resched_count == 0);
p->polled_resched_count = p->resched_count;
p->resched_count = 0;
unpolled_work = true;
}
if (p->earmark_count > 0) {
p->earmark_drain = true;
unpolled_work = true;
}
// Take stock of ready list before any post_event()
lock(&p->eventfd_mutex);
schedule_ready_list(p);
if (p->sched_ready_first)
unpolled_work = true;
if (++p->ready_list_generation == 0) // wrapping OK, but 0 means unset
p->ready_list_generation = 1;
unlock(&p->eventfd_mutex);
if (n_events < 0) {
if (errno != EINTR)
perror("epoll_wait"); // TODO: proper log
if (!can_block && !unpolled_work)
return true;
else
continue;
} else if (n_events == 0) {
if (!can_block && !unpolled_work)
return true;
else {
if (!epoll_immediate)
perror("epoll_wait unexpected timeout"); // TODO: proper log
if (!unpolled_work)
continue;
}
}
break;
}
// We have unpolled work or at least one new epoll event.
// Remember tasks that together constitute new work. See note at beginning about duplicates.
lock(&p->eventfd_mutex);
// Longest hold of eventfd_mutex. The following must be quick with no external calls:
// post_event(), make_runnable(), assign_thread(), earmark_thread().
for (int i = 0; i < n_events; i++) {
tsk = post_event(p, &p->kevents[i]);
if (tsk)
make_runnable(tsk);
}
unlock(&p->eventfd_mutex);
if (n_events > 0)
memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
// The list of ready tasks can be very long. Tradeoff between slow walk through linked
// list looking for more warm pairings (while holding the sched lock), or letting
// threads looking for work grab from the front. Search less when busy. TODO:
// instrument an optimal value or heuristic.
int warm_tries = p->suspend_list_count - p->n_warm_runnables;
if (warm_tries < 0)
warm_tries = 0;
int max_runnables = p->runnables_capacity;
while (p->sched_ready_count && p->n_runnables < max_runnables && warm_tries) {
task_t *ctsk = sched_ready_pop_front(p);
tsk = post_ready(p, ctsk);
warm_tries--;
if (tsk)
make_runnable(tsk);
}
// sched_ready list is now either consumed or partially deferred.
// Allow next_runnable() to see any remaining sched_ready tasks.
p->sched_ready_pending = p->sched_ready_count > 0;
while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
task_t *ctsk = resched_pop_front(p);
assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
ctsk->runner = NULL; // Allow task to run again.
warm_tries--;
make_runnable(ctsk);
}
if (pni_immediate && !ts->task) {
// Poller gets to run if possible
task_t *ptsk;
if (p->n_runnables) {
assert(p->next_runnable == 0);
ptsk = p->runnables[0];
if (++p->next_runnable == p->n_runnables)
p->n_runnables = 0;
} else if (p->n_warm_runnables) {
// Immediate doesn't contemplate some other (warm) thread running instead
ptsk = p->warm_runnables[--p->n_warm_runnables];
tslot_t *ts2 = ptsk->runner;
ts2->prev_task = ts2->task = NULL;
ptsk->runner = NULL;
} else if (p->last_earmark) {
ptsk = p->last_earmark->task;
remove_earmark(p->last_earmark);
if (p->earmark_count == 0)
p->earmark_drain = false;
} else {
ptsk = NULL;
}
if (ptsk) {
assign_thread(ts, ptsk);
}
}
return false;
}