in source/linux/io_uring_context.cpp [545:631]
void io_uring_context::acquire_completion_queue_items() noexcept {
// Use 'relaxed' load for the head since it should only ever
// be modified by the current thread.
std::uint32_t cqHead = cqHead_->load(std::memory_order_relaxed);
std::uint32_t cqTail = cqTail_->load(std::memory_order_acquire);
LOGX("completion queue head = %u, tail = %u\n", cqHead, cqTail);
if (cqHead != cqTail) {
const auto mask = cqMask_;
const auto count = cqTail - cqHead;
UNIFEX_ASSERT(count <= cqEntryCount_);
operation_base head;
LOGX("got %u completions\n", count);
operation_queue completionQueue;
for (std::uint32_t i = 0; i < count; ++i) {
auto& cqe = cqEntries_[(cqHead + i) & mask];
if (cqe.user_data == remote_queue_event_user_data) {
LOG("got remote queue wakeup");
if (cqe.res < 0) {
LOGX("remote queue wakeup failed err: %i\n", cqe.res);
// readv() operation failed.
// TODO: What to do here?
std::terminate();
}
// Read the eventfd to clear the signal.
__u64 buffer;
ssize_t bytesRead =
read(remoteQueueEventFd_.get(), &buffer, sizeof(buffer));
if (bytesRead < 0) {
// read() failed
[[maybe_unused]] int errorCode = errno;
LOGX("read on eventfd failed with %i\n", errorCode);
std::terminate();
}
UNIFEX_ASSERT(bytesRead == sizeof(buffer));
// Skip processing this item and let the loop check
// for the remote-queued items next time around.
remoteQueueReadSubmitted_ = false;
continue;
} else if (cqe.user_data == timer_user_data()) {
LOGX("got timer completion result %i\n", cqe.res);
UNIFEX_ASSERT(activeTimerCount_ > 0);
--activeTimerCount_;
LOGX("now %u active timers\n", activeTimerCount_);
if (cqe.res != ECANCELED) {
LOG("timer not cancelled, marking timers as dirty");
timersAreDirty_ = true;
}
if (activeTimerCount_ == 0) {
LOG("no more timers, resetting current due time");
currentDueTime_.reset();
}
continue;
} else if (cqe.user_data == remove_timer_user_data()) {
// Ignore timer cancellation completion.
continue;
}
auto& completionState = *reinterpret_cast<completion_base*>(
static_cast<std::uintptr_t>(cqe.user_data));
// Save the result in the completion state.
completionState.result_ = cqe.res;
// Add it to a temporary queue of newly completed items.
completionQueue.push_back(&completionState);
}
schedule_local(std::move(completionQueue));
// Mark those completion queue entries as consumed.
cqHead_->store(cqTail, std::memory_order_release);
cqPendingCount_ -= count;
}
}