void io_uring_context::run_impl()

in source/linux/io_uring_context.cpp [394:475]


void io_uring_context::run_impl(const bool& shouldStop) {
  LOG("run loop started");

  auto* oldContext = std::exchange(currentThreadContext, this);
  scope_guard g = [=]() noexcept {
    std::exchange(currentThreadContext, oldContext);
    LOG("run loop exited");
  };

  while (true) {
    // Dequeue and process local queue items (ready to run)
    execute_pending_local();

    if (shouldStop) {
      break;
    }

    // Check for any new completion-queue items.
    acquire_completion_queue_items();

    if (timersAreDirty_) {
      update_timers();
    }

    // Check for remotely-queued items.
    // Only do this if we haven't submitted a poll operation for the
    // completion queue - in which case we'll just wait until we receive the
    // completion-queue item).
    if (!remoteQueueReadSubmitted_) {
      acquire_remote_queued_items();
    }

    // Process additional I/O requests that were waiting for
    // additional space either in the submission queue or the completion queue.
    while (!pendingIoQueue_.empty() && can_submit_io()) {
      auto* item = pendingIoQueue_.pop_front();
      item->execute_(item);
    }

    if (localQueue_.empty() || sqUnflushedCount_ > 0) {
      const bool isIdle = sqUnflushedCount_ == 0 && localQueue_.empty();
      if (isIdle) {
        if (!remoteQueueReadSubmitted_) {
          LOG("try_register_remote_queue_notification()");
          remoteQueueReadSubmitted_ = try_register_remote_queue_notification();
        }
      }

      int minCompletionCount = 0;
      unsigned flags = 0;
      if (isIdle &&
          (remoteQueueReadSubmitted_ ||
           pending_operation_count() == cqEntryCount_)) {
        // No work to do until we receive a completion event.
        minCompletionCount = 1;
        flags = IORING_ENTER_GETEVENTS;
      }

      LOGX(
          "io_uring_enter() - submit %u, wait for %i, pending %u\n",
          sqUnflushedCount_,
          minCompletionCount,
          pending_operation_count());

      int result = io_uring_enter(
          iouringFd_.get(),
          sqUnflushedCount_,
          minCompletionCount,
          flags,
          nullptr);
      if (result < 0) {
        int errorCode = errno;
        throw_(std::system_error{errorCode, std::system_category()});
      }

      LOG("io_uring_enter() returned");

      sqUnflushedCount_ -= result;
      cqPendingCount_ += result;
    }
  }
}