void epoll_reactor::run()

in demo_example/asio/asio/detail/impl/epoll_reactor.ipp [476:595]


void epoll_reactor::run(long usec, op_queue<operation>& ops)
{
  // This code relies on the fact that the scheduler queues the reactor task
  // behind all descriptor operations generated by this function. This means,
  // that by the time we reach this point, any previously returned descriptor
  // operations have already been dequeued. Therefore it is now safe for us to
  // reuse and return them for the scheduler to queue again.

  // Calculate timeout. Check the timer queues only if timerfd is not in use.
  int timeout;
  if (usec == 0)
    timeout = 0;
  else
  {
    timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
    if (timer_fd_ == -1)
    {
      mutex::scoped_lock lock(mutex_);
      timeout = get_timeout(timeout);
    }
  }

  // Block on the epoll descriptor.
  epoll_event events[128];
  int num_events = epoll_wait(epoll_fd_, events, 128, timeout);

#if defined(ASIO_ENABLE_HANDLER_TRACKING)
  // Trace the waiting events.
  for (int i = 0; i < num_events; ++i)
  {
    void* ptr = events[i].data.ptr;
    if (ptr == &interrupter_)
    {
      // Ignore.
    }
# if defined(ASIO_HAS_TIMERFD)
    else if (ptr == &timer_fd_)
    {
      // Ignore.
    }
# endif // defined(ASIO_HAS_TIMERFD)
    else
    {
      unsigned event_mask = 0;
      if ((events[i].events & EPOLLIN) != 0)
        event_mask |= ASIO_HANDLER_REACTOR_READ_EVENT;
      if ((events[i].events & EPOLLOUT))
        event_mask |= ASIO_HANDLER_REACTOR_WRITE_EVENT;
      if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
        event_mask |= ASIO_HANDLER_REACTOR_ERROR_EVENT;
      ASIO_HANDLER_REACTOR_EVENTS((context(),
            reinterpret_cast<uintmax_t>(ptr), event_mask));
    }
  }
#endif // defined(ASIO_ENABLE_HANDLER_TRACKING)

#if defined(ASIO_HAS_TIMERFD)
  bool check_timers = (timer_fd_ == -1);
#else // defined(ASIO_HAS_TIMERFD)
  bool check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)

  // Dispatch the waiting events.
  for (int i = 0; i < num_events; ++i)
  {
    void* ptr = events[i].data.ptr;
    if (ptr == &interrupter_)
    {
      // No need to reset the interrupter since we're leaving the descriptor
      // in a ready-to-read state and relying on edge-triggered notifications
      // to make it so that we only get woken up when the descriptor's epoll
      // registration is updated.

#if defined(ASIO_HAS_TIMERFD)
      if (timer_fd_ == -1)
        check_timers = true;
#else // defined(ASIO_HAS_TIMERFD)
      check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)
    }
#if defined(ASIO_HAS_TIMERFD)
    else if (ptr == &timer_fd_)
    {
      check_timers = true;
    }
#endif // defined(ASIO_HAS_TIMERFD)
    else
    {
      // The descriptor operation doesn't count as work in and of itself, so we
      // don't call work_started() here. This still allows the scheduler to
      // stop if the only remaining operations are descriptor operations.
      descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
      if (!ops.is_enqueued(descriptor_data))
      {
        descriptor_data->set_ready_events(events[i].events);
        ops.push(descriptor_data);
      }
      else
      {
        descriptor_data->add_ready_events(events[i].events);
      }
    }
  }

  if (check_timers)
  {
    mutex::scoped_lock common_lock(mutex_);
    timer_queues_.get_ready_timers(ops);

#if defined(ASIO_HAS_TIMERFD)
    if (timer_fd_ != -1)
    {
      itimerspec new_timeout;
      itimerspec old_timeout;
      int flags = get_timeout(new_timeout);
      timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
    }
#endif // defined(ASIO_HAS_TIMERFD)
  }
}