in source/linux/io_uring_context.cpp [394:475]
void io_uring_context::run_impl(const bool& shouldStop) {
LOG("run loop started");
auto* oldContext = std::exchange(currentThreadContext, this);
scope_guard g = [=]() noexcept {
std::exchange(currentThreadContext, oldContext);
LOG("run loop exited");
};
while (true) {
// Dequeue and process local queue items (ready to run)
execute_pending_local();
if (shouldStop) {
break;
}
// Check for any new completion-queue items.
acquire_completion_queue_items();
if (timersAreDirty_) {
update_timers();
}
// Check for remotely-queued items.
// Only do this if we haven't submitted a poll operation for the
// completion queue - in which case we'll just wait until we receive the
// completion-queue item).
if (!remoteQueueReadSubmitted_) {
acquire_remote_queued_items();
}
// Process additional I/O requests that were waiting for
// additional space either in the submission queue or the completion queue.
while (!pendingIoQueue_.empty() && can_submit_io()) {
auto* item = pendingIoQueue_.pop_front();
item->execute_(item);
}
if (localQueue_.empty() || sqUnflushedCount_ > 0) {
const bool isIdle = sqUnflushedCount_ == 0 && localQueue_.empty();
if (isIdle) {
if (!remoteQueueReadSubmitted_) {
LOG("try_register_remote_queue_notification()");
remoteQueueReadSubmitted_ = try_register_remote_queue_notification();
}
}
int minCompletionCount = 0;
unsigned flags = 0;
if (isIdle &&
(remoteQueueReadSubmitted_ ||
pending_operation_count() == cqEntryCount_)) {
// No work to do until we receive a completion event.
minCompletionCount = 1;
flags = IORING_ENTER_GETEVENTS;
}
LOGX(
"io_uring_enter() - submit %u, wait for %i, pending %u\n",
sqUnflushedCount_,
minCompletionCount,
pending_operation_count());
int result = io_uring_enter(
iouringFd_.get(),
sqUnflushedCount_,
minCompletionCount,
flags,
nullptr);
if (result < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
LOG("io_uring_enter() returned");
sqUnflushedCount_ -= result;
cqPendingCount_ += result;
}
}
}