static bool poller_do_epoll()

in c/src/proactor/epoll.c [2524:2681]


static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
  // As poller with lots to do, be mindful of hogging the sched lock.  Release when making kernel calls.
  assert(!p->resched_cutoff);
  assert(!p->sched_ready_first && !p->sched_ready_pending);
  int n_events;
  task_t *tsk;
  bool unpolled_work = false;

  while (true) {
    assert(p->n_runnables == 0);
    if (p->thread_count > p->thread_capacity)
      grow_poller_bufs(p);
    p->next_runnable = 0;
    p->n_warm_runnables = 0;
    p->last_earmark = NULL;

    bool unfinished_earmarks = p->earmark_count > 0;
    if (unfinished_earmarks || p->resched_first)
      unpolled_work = true;
    bool epoll_immediate = unpolled_work || !can_block;

    // Determine if notify_poller() can be avoided.
    if (!epoll_immediate) {
      lock(&p->eventfd_mutex);
      if (p->ready_list_first) {
        unpolled_work = true;
        epoll_immediate = true;
      } else {
        // Poller may sleep.  Enable eventfd wakeup.
        p->ready_list_active = false;
      }
      unlock(&p->eventfd_mutex);
    }

    int timeout = (epoll_immediate) ? 0 : -1;
    p->poller_suspended = (timeout == -1);
    unlock(&p->sched_mutex);

    n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);

    lock(&p->sched_mutex);
    p->poller_suspended = false;

    if (p->resched_first) {
      // Defer future resched tasks until next do_epoll()
      p->resched_cutoff = p->resched_last;
      assert(p->polled_resched_count == 0);
      p->polled_resched_count = p->resched_count;
      p->resched_count = 0;
      unpolled_work = true;
    }
    if (p->earmark_count > 0) {
      p->earmark_drain = true;
      unpolled_work = true;
    }
    // Take stock of ready list before any post_event()
    lock(&p->eventfd_mutex);
    schedule_ready_list(p);
    if (p->sched_ready_first)
      unpolled_work = true;
    if (++p->ready_list_generation == 0) // wrapping OK, but 0 means unset
      p->ready_list_generation = 1;
    unlock(&p->eventfd_mutex);

    if (n_events < 0) {
      if (errno != EINTR)
        perror("epoll_wait"); // TODO: proper log
      if (!can_block && !unpolled_work)
        return true;
      else
        continue;
    } else if (n_events == 0) {
      if (!can_block && !unpolled_work)
        return true;
      else {
        if (!epoll_immediate)
          perror("epoll_wait unexpected timeout"); // TODO: proper log
        if (!unpolled_work)
          continue;
      }
    }

    break;
  }

  // We have unpolled work or at least one new epoll event.
  // Remember tasks that together constitute new work.  See note at beginning about duplicates.

  lock(&p->eventfd_mutex);

  // Longest hold of eventfd_mutex.  The following must be quick with no external calls:
  // post_event(), make_runnable(), assign_thread(), earmark_thread().
  for (int i = 0; i < n_events; i++) {
    tsk = post_event(p, &p->kevents[i]);
    if (tsk)
      make_runnable(tsk);
  }
  unlock(&p->eventfd_mutex);

  if (n_events > 0)
    memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);

  // The list of ready tasks can be very long.  Tradeoff between slow walk through linked
  // list looking for more warm pairings (while holding the sched lock), or letting
  // threads looking for work grab from the front.  Search less when busy.  TODO:
  // instrument an optimal value or heuristic.
  int warm_tries = p->suspend_list_count - p->n_warm_runnables;
  if (warm_tries < 0)
    warm_tries = 0;

  int max_runnables = p->runnables_capacity;
  while (p->sched_ready_count && p->n_runnables < max_runnables && warm_tries) {
    task_t *ctsk = sched_ready_pop_front(p);
    tsk = post_ready(p, ctsk);
    warm_tries--;
    if (tsk)
      make_runnable(tsk);
  }
  // sched_ready list is now either consumed or partially deferred.
  // Allow next_runnable() to see any remaining sched_ready tasks.
  p->sched_ready_pending = p->sched_ready_count > 0;

  while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
    task_t *ctsk = resched_pop_front(p);
    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
    ctsk->runner = NULL;  // Allow task to run again.
    warm_tries--;
    make_runnable(ctsk);
  }

  if (pni_immediate && !ts->task) {
    // Poller gets to run if possible
    task_t *ptsk;
    if (p->n_runnables) {
      assert(p->next_runnable == 0);
      ptsk = p->runnables[0];
      if (++p->next_runnable == p->n_runnables)
        p->n_runnables = 0;
    } else if (p->n_warm_runnables) {
      // Immediate doesn't contemplate some other (warm) thread running instead
      ptsk = p->warm_runnables[--p->n_warm_runnables];
      tslot_t *ts2 = ptsk->runner;
      ts2->prev_task = ts2->task = NULL;
      ptsk->runner = NULL;
    } else if (p->last_earmark) {
      ptsk = p->last_earmark->task;
      remove_earmark(p->last_earmark);
      if (p->earmark_count == 0)
        p->earmark_drain = false;
    } else {
      ptsk = NULL;
    }
    if (ptsk) {
      assign_thread(ts, ptsk);
    }
  }
  return false;
}