void low_latency_iocp_context::run_impl()

in source/win32/low_latency_iocp_context.cpp [115:255]


  void low_latency_iocp_context::run_impl(bool& stopFlag) {
    ntapi::LARGE_INTEGER zero;
    zero.QuadPart = 0;

    const ntapi::PLARGE_INTEGER zeroTimeout = &zero;
    const ntapi::PLARGE_INTEGER infiniteTimeout = nullptr;

    [[maybe_unused]] auto prevId = activeThreadId_.exchange(
        std::this_thread::get_id(), std::memory_order_relaxed);
    UNIFEX_ASSERT(prevId == std::thread::id());

    scope_guard resetActiveThreadIdOnExit = [&]() noexcept {
      activeThreadId_.store(std::thread::id(), std::memory_order_relaxed);
    };

    constexpr std::uint32_t completionBufferSize = 128;
    ntapi::FILE_IO_COMPLETION_INFORMATION
        completionBuffer[completionBufferSize];
    std::memset(&completionBuffer, 0, sizeof(completionBuffer));

    bool shouldCheckRemoteQueue = true;

    // Make sure that we leave the remote queue in a
    // state such that the  'shouldCheckRemoteQueue = true'
    // is a valid initial state for the next caller to run().
    scope_guard ensureRemoteQueueActiveOnExit = [&]() noexcept {
      if (!shouldCheckRemoteQueue) {
        (void)remoteQueue_.try_mark_active();
      }
    };

    while (!stopFlag) {
process_ready_queue:
      // Process/drain all ready-to-run work.
      while (!readyQueue_.empty()) {
        operation_base* task = readyQueue_.pop_front();
        task->callback(task);
      }

      // Check whether the stop request was processed
      // when draining the ready queue.
      if (stopFlag) {
        break;
      }

      // Check the poll-queue for I/O operations that might have completed
      // already but where we have not yet seen the IOCP event.

      while (!pollQueue_.empty()) {
        io_operation* op = static_cast<io_operation*>(pollQueue_.pop_front());
        auto* state = op->ioState;
        UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
        UNIFEX_ASSERT(!state->completed);

        if (poll_is_complete(*state)) {
          // Completed before we received any notifications via IOCP.
          // Schedule it to resume now, before polling any other I/Os
          // to give those other I/Os time to complete.
          state->completed = true;
          schedule_local(state->parent);
          goto process_ready_queue;
        }
      }

process_remote_queue:
      if (shouldCheckRemoteQueue) {
        if (try_dequeue_remote_work()) {
          goto process_ready_queue;
        }
      }

      // Now check if there are any IOCP events.
get_iocp_entries:
      const ntapi::BOOLEAN alertable = FALSE;
      ntapi::ULONG completionEntriesRetrieved = 0;
      ntapi::NTSTATUS ntstat = ntapi::NtRemoveIoCompletionEx(
          iocp_.get(),
          completionBuffer,
          completionBufferSize,
          &completionEntriesRetrieved,
          shouldCheckRemoteQueue ? zeroTimeout : infiniteTimeout,
          alertable);
      if (ntstat == STATUS_TIMEOUT) {
        UNIFEX_ASSERT(shouldCheckRemoteQueue);

        // Previous call was non-blocking.
        // About to transition to blocking-call, but first need to
        // mark remote queue inactive so that remote threads know
        // they need to post an event to wake this thread up.
        if (remoteQueue_.try_mark_inactive()) {
          shouldCheckRemoteQueue = false;
          goto get_iocp_entries;
        } else {
          goto process_remote_queue;
        }
      } else if (!ntapi::ntstatus_success(ntstat)) {
        DWORD errorCode = ntapi::RtlNtStatusToDosError(ntstat);
        throw_(std::system_error{
            static_cast<int>(errorCode),
            std::system_category(),
            "NtRemoveIoCompletionEx()"});
      }

      // Process completion-entries we received from the OS.
      for (ULONG i = 0; i < completionEntriesRetrieved; ++i) {
        ntapi::FILE_IO_COMPLETION_INFORMATION& entry = completionBuffer[i];
        if (entry.ApcContext != nullptr) {
          ntapi::IO_STATUS_BLOCK* iosb =
              reinterpret_cast<ntapi::IO_STATUS_BLOCK*>(entry.ApcContext);
          // TODO: Do we need to store the error-code/bytes-transferred here?
          // Is entry.IoStatusBlock just a copy of what is already in 'iosb'.
          // Should we be copying entry.IoStatusBlock to '*iosb'?

          UNIFEX_ASSERT(iosb->Status != STATUS_PENDING);

          vectored_io_state* state = to_io_state(iosb);

          UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
          if (--state->pendingCompletionNotifications == 0) {
            // This was the last pending notification for this state.
            if (state->parent != nullptr) {
              // An operation is still attached to this state.
              // Notify it if we hadn't already done so.
              if (!state->completed) {
                state->completed = true;
                schedule_local(state->parent);
              }
            } else {
              // Otherwise the operation has already detached
              // from this I/O state and so we can immediately
              // return it to the pool.
              release_io_state(state);
            }
          }
        } else {
          // A remote-thread wake-up notification.
          shouldCheckRemoteQueue = true;
        }
      }
    }
  }