void kqueue_reactor::run()

in demo_example/asio/asio/detail/impl/kqueue_reactor.ipp [427:534]


void kqueue_reactor::run(long usec, op_queue<operation>& ops)
{
  mutex::scoped_lock lock(mutex_);

  // Determine how long to block while waiting for events.
  timespec timeout_buf = { 0, 0 };
  timespec* timeout = usec ? get_timeout(usec, timeout_buf) : &timeout_buf;

  lock.unlock();

  // Block on the kqueue descriptor.
  struct kevent events[128];
  int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);

#if defined(ASIO_ENABLE_HANDLER_TRACKING)
  // Trace the waiting events.
  for (int i = 0; i < num_events; ++i)
  {
    void* ptr = reinterpret_cast<void*>(events[i].udata);
    if (ptr != &interrupter_)
    {
      unsigned event_mask = 0;
      switch (events[i].filter)
      {
      case EVFILT_READ:
        event_mask |= ASIO_HANDLER_REACTOR_READ_EVENT;
        break;
      case EVFILT_WRITE:
        event_mask |= ASIO_HANDLER_REACTOR_WRITE_EVENT;
        break;
      }
      if ((events[i].flags & (EV_ERROR | EV_OOBAND)) != 0)
        event_mask |= ASIO_HANDLER_REACTOR_ERROR_EVENT;
      ASIO_HANDLER_REACTOR_EVENTS((context(),
            reinterpret_cast<uintmax_t>(ptr), event_mask));
    }
  }
#endif // defined(ASIO_ENABLE_HANDLER_TRACKING)

  // Dispatch the waiting events.
  for (int i = 0; i < num_events; ++i)
  {
    void* ptr = reinterpret_cast<void*>(events[i].udata);
    if (ptr == &interrupter_)
    {
      interrupter_.reset();
    }
    else
    {
      descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
      mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

      if (events[i].filter == EVFILT_WRITE
          && descriptor_data->num_kevents_ == 2
          && descriptor_data->op_queue_[write_op].empty())
      {
        // Some descriptor types, like serial ports, don't seem to support
        // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
        // operations we'll remove the EVFILT_WRITE registration here so that
        // we don't end up in a tight spin.
        struct kevent delete_events[1];
        ASIO_KQUEUE_EV_SET(&delete_events[0],
            descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
        ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
        descriptor_data->num_kevents_ = 1;
      }

      // Exception operations must be processed first to ensure that any
      // out-of-band data is read before normal data.
#if defined(__NetBSD__)
      static const unsigned int filter[max_ops] =
#else
      static const int filter[max_ops] =
#endif
        { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
      for (int j = max_ops - 1; j >= 0; --j)
      {
        if (events[i].filter == filter[j])
        {
          if (j != except_op || events[i].flags & EV_OOBAND)
          {
            while (reactor_op* op = descriptor_data->op_queue_[j].front())
            {
              if (events[i].flags & EV_ERROR)
              {
                op->ec_ = asio::error_code(
                    static_cast<int>(events[i].data),
                    asio::error::get_system_category());
                descriptor_data->op_queue_[j].pop();
                ops.push(op);
              }
              if (op->perform())
              {
                descriptor_data->op_queue_[j].pop();
                ops.push(op);
              }
              else
                break;
            }
          }
        }
      }
    }
  }

  lock.lock();
  timer_queues_.get_ready_timers(ops);
}