void io_uring_context::acquire_completion_queue_items()

in source/linux/io_uring_context.cpp [545:631]


void io_uring_context::acquire_completion_queue_items() noexcept {
  // Use 'relaxed' load for the head since it should only ever
  // be modified by the current thread.
  std::uint32_t cqHead = cqHead_->load(std::memory_order_relaxed);
  std::uint32_t cqTail = cqTail_->load(std::memory_order_acquire);
  LOGX("completion queue head = %u, tail = %u\n", cqHead, cqTail);

  if (cqHead != cqTail) {
    const auto mask = cqMask_;
    const auto count = cqTail - cqHead;
    UNIFEX_ASSERT(count <= cqEntryCount_);

    operation_base head;

    LOGX("got %u completions\n", count);

    operation_queue completionQueue;

    for (std::uint32_t i = 0; i < count; ++i) {
      auto& cqe = cqEntries_[(cqHead + i) & mask];

      if (cqe.user_data == remote_queue_event_user_data) {
        LOG("got remote queue wakeup");
        if (cqe.res < 0) {
          LOGX("remote queue wakeup failed err: %i\n", cqe.res);

          // readv() operation failed.
          // TODO: What to do here?
          std::terminate();
        }

        // Read the eventfd to clear the signal.
        __u64 buffer;
        ssize_t bytesRead =
            read(remoteQueueEventFd_.get(), &buffer, sizeof(buffer));
        if (bytesRead < 0) {
          // read() failed
          [[maybe_unused]] int errorCode = errno;
          LOGX("read on eventfd failed with %i\n", errorCode);

          std::terminate();
        }

        UNIFEX_ASSERT(bytesRead == sizeof(buffer));

        // Skip processing this item and let the loop check
        // for the remote-queued items next time around.
        remoteQueueReadSubmitted_ = false;
        continue;
      } else if (cqe.user_data == timer_user_data()) {
        LOGX("got timer completion result %i\n", cqe.res);
        UNIFEX_ASSERT(activeTimerCount_ > 0);
        --activeTimerCount_;

        LOGX("now %u active timers\n", activeTimerCount_);
        if (cqe.res != ECANCELED) {
          LOG("timer not cancelled, marking timers as dirty");
          timersAreDirty_ = true;
        }

        if (activeTimerCount_ == 0) {
          LOG("no more timers, resetting current due time");
          currentDueTime_.reset();
        }
        continue;
      } else if (cqe.user_data == remove_timer_user_data()) {
        // Ignore timer cancellation completion.
        continue;
      }

      auto& completionState = *reinterpret_cast<completion_base*>(
          static_cast<std::uintptr_t>(cqe.user_data));

      // Save the result in the completion state.
      completionState.result_ = cqe.res;

      // Add it to a temporary queue of newly completed items.
      completionQueue.push_back(&completionState);
    }

    schedule_local(std::move(completionQueue));

    // Mark those completion queue entries as consumed.
    cqHead_->store(cqTail, std::memory_order_release);
    cqPendingCount_ -= count;
  }
}