in source/win32/low_latency_iocp_context.cpp [115:255]
void low_latency_iocp_context::run_impl(bool& stopFlag) {
ntapi::LARGE_INTEGER zero;
zero.QuadPart = 0;
const ntapi::PLARGE_INTEGER zeroTimeout = &zero;
const ntapi::PLARGE_INTEGER infiniteTimeout = nullptr;
[[maybe_unused]] auto prevId = activeThreadId_.exchange(
std::this_thread::get_id(), std::memory_order_relaxed);
UNIFEX_ASSERT(prevId == std::thread::id());
scope_guard resetActiveThreadIdOnExit = [&]() noexcept {
activeThreadId_.store(std::thread::id(), std::memory_order_relaxed);
};
constexpr std::uint32_t completionBufferSize = 128;
ntapi::FILE_IO_COMPLETION_INFORMATION
completionBuffer[completionBufferSize];
std::memset(&completionBuffer, 0, sizeof(completionBuffer));
bool shouldCheckRemoteQueue = true;
// Make sure that we leave the remote queue in a
// state such that the 'shouldCheckRemoteQueue = true'
// is a valid initial state for the next caller to run().
scope_guard ensureRemoteQueueActiveOnExit = [&]() noexcept {
if (!shouldCheckRemoteQueue) {
(void)remoteQueue_.try_mark_active();
}
};
while (!stopFlag) {
process_ready_queue:
// Process/drain all ready-to-run work.
while (!readyQueue_.empty()) {
operation_base* task = readyQueue_.pop_front();
task->callback(task);
}
// Check whether the stop request was processed
// when draining the ready queue.
if (stopFlag) {
break;
}
// Check the poll-queue for I/O operations that might have completed
// already but where we have not yet seen the IOCP event.
while (!pollQueue_.empty()) {
io_operation* op = static_cast<io_operation*>(pollQueue_.pop_front());
auto* state = op->ioState;
UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
UNIFEX_ASSERT(!state->completed);
if (poll_is_complete(*state)) {
// Completed before we received any notifications via IOCP.
// Schedule it to resume now, before polling any other I/Os
// to give those other I/Os time to complete.
state->completed = true;
schedule_local(state->parent);
goto process_ready_queue;
}
}
process_remote_queue:
if (shouldCheckRemoteQueue) {
if (try_dequeue_remote_work()) {
goto process_ready_queue;
}
}
// Now check if there are any IOCP events.
get_iocp_entries:
const ntapi::BOOLEAN alertable = FALSE;
ntapi::ULONG completionEntriesRetrieved = 0;
ntapi::NTSTATUS ntstat = ntapi::NtRemoveIoCompletionEx(
iocp_.get(),
completionBuffer,
completionBufferSize,
&completionEntriesRetrieved,
shouldCheckRemoteQueue ? zeroTimeout : infiniteTimeout,
alertable);
if (ntstat == STATUS_TIMEOUT) {
UNIFEX_ASSERT(shouldCheckRemoteQueue);
// Previous call was non-blocking.
// About to transition to blocking-call, but first need to
// mark remote queue inactive so that remote threads know
// they need to post an event to wake this thread up.
if (remoteQueue_.try_mark_inactive()) {
shouldCheckRemoteQueue = false;
goto get_iocp_entries;
} else {
goto process_remote_queue;
}
} else if (!ntapi::ntstatus_success(ntstat)) {
DWORD errorCode = ntapi::RtlNtStatusToDosError(ntstat);
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"NtRemoveIoCompletionEx()"});
}
// Process completion-entries we received from the OS.
for (ULONG i = 0; i < completionEntriesRetrieved; ++i) {
ntapi::FILE_IO_COMPLETION_INFORMATION& entry = completionBuffer[i];
if (entry.ApcContext != nullptr) {
ntapi::IO_STATUS_BLOCK* iosb =
reinterpret_cast<ntapi::IO_STATUS_BLOCK*>(entry.ApcContext);
// TODO: Do we need to store the error-code/bytes-transferred here?
// Is entry.IoStatusBlock just a copy of what is already in 'iosb'.
// Should we be copying entry.IoStatusBlock to '*iosb'?
UNIFEX_ASSERT(iosb->Status != STATUS_PENDING);
vectored_io_state* state = to_io_state(iosb);
UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
if (--state->pendingCompletionNotifications == 0) {
// This was the last pending notification for this state.
if (state->parent != nullptr) {
// An operation is still attached to this state.
// Notify it if we hadn't already done so.
if (!state->completed) {
state->completed = true;
schedule_local(state->parent);
}
} else {
// Otherwise the operation has already detached
// from this I/O state and so we can immediately
// return it to the pool.
release_io_state(state);
}
}
} else {
// A remote-thread wake-up notification.
shouldCheckRemoteQueue = true;
}
}
}
}