void io_epoll_context::acquire_completion_queue_items()

in source/linux/io_epoll_context.cpp [250:329]


void io_epoll_context::acquire_completion_queue_items() {

  LOG("epoll_wait()");

  epoll_event completions[io_epoll_max_event_count];
  int result = epoll_wait(
    epollFd_.get(),
    completions,
    io_epoll_max_event_count,
    localQueue_.empty() ? -1 : 0);
  if (result < 0) {
    int errorCode = errno;
    throw_(std::system_error{errorCode, std::system_category(), "epoll_wait"});
  }
  std::uint32_t count = result;

  LOGX("got %u completions\n", count);

  operation_queue completionQueue;

  for (std::uint32_t i = 0; i < count; ++i) {
    auto& completed = completions[i];

    if (completed.data.ptr == remote_queue_event_user_data) {
      LOG("got remote queue wakeup");

      // Read the eventfd to clear the signal.
      std::uint64_t buffer;
      ssize_t bytesRead =
          read(remoteQueueEventFd_.get(), &buffer, sizeof(buffer));
      if (bytesRead < 0) {
        // read() failed
        [[maybe_unused]] int errorCode = errno;
        LOGX("read on eventfd failed with %i\n", errorCode);

        std::terminate();
      }

      UNIFEX_ASSERT(bytesRead == sizeof(buffer));

      // Skip processing this item and let the loop check
      // for the remote-queued items next time around.
      remoteQueueReadSubmitted_ = false;
      continue;
    } else if (completed.data.ptr == timer_user_data()) {
      LOG("got timer wakeup");
      currentDueTime_.reset();
      timersAreDirty_ = true;

      // Read the eventfd to clear the signal.
      std::uint64_t buffer;
      ssize_t bytesRead =
          read(timerFd_.get(), &buffer, sizeof(buffer));
      if (bytesRead < 0) {
        // read() failed
        [[maybe_unused]] int errorCode = errno;
        LOGX("read on timerfd failed with %i\n", errorCode);

        std::terminate();
      }

      UNIFEX_ASSERT(bytesRead == sizeof(buffer));
      continue;
    }

    LOGX("completion event %i\n", completed.events);
    auto& completionState = *reinterpret_cast<completion_base*>(completed.data.ptr);

    UNIFEX_ASSERT(completionState.enqueued_.load() == 0);
    ++completionState.enqueued_;

    // Save the result in the completion state.
    // completionState.result_ = cqe.res;

    // Add it to a temporary queue of newly completed items.
    completionQueue.push_back(&completionState);
  }

  schedule_local(std::move(completionQueue));
}