source/win32/low_latency_iocp_context.cpp (498 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unifex/win32/low_latency_iocp_context.hpp>
#include <unifex/scope_guard.hpp>
#include <unifex/exception.hpp>
#include <atomic>
#include <cstring>
#include <random>
#include <system_error>
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <Windows.h>
namespace unifex::win32
{
namespace
{
static safe_handle create_iocp() {
HANDLE h = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 1);
if (h == NULL) {
DWORD errorCode = ::GetLastError();
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"CreateIoCompletionPort()"});
}
return safe_handle{h};
}
} // namespace
low_latency_iocp_context::low_latency_iocp_context(
std::size_t maxIoOperations)
: activeThreadId_(std::thread::id())
, iocp_(create_iocp())
, ioPoolSize_(maxIoOperations)
, ioPool_(std::make_unique<vectored_io_state[]>(maxIoOperations)) {
// Make sure the WinNT APIs are initialised and available.
// Only need to do this on construction as this will be guaranteed
// to run before anything else needs to call them.
ntapi::ensure_initialised();
// Build the I/O free-list in reverse so front of free-list
// is first element in array.
for (std::size_t i = 0; i < maxIoOperations; ++i) {
ioFreeList_.push_front(&ioPool_[maxIoOperations - 1 - i]);
}
}
low_latency_iocp_context::~low_latency_iocp_context() {
// Wait until the completion-event for every io-state has been
// received before we free the memory for the io-states.
std::size_t remaining = ioPoolSize_;
while (!ioFreeList_.empty()) {
(void)ioFreeList_.pop_front();
--remaining;
}
if (remaining > 0) {
constexpr std::uint32_t completionBufferSize = 128;
ntapi::FILE_IO_COMPLETION_INFORMATION
completionBuffer[completionBufferSize];
std::memset(&completionBuffer, 0, sizeof(completionBuffer));
do {
ntapi::ULONG numEntriesRemoved = 0;
ntapi::NTSTATUS ntstat = ntapi::NtRemoveIoCompletionEx(
iocp_.get(),
completionBuffer,
completionBufferSize,
&numEntriesRemoved,
NULL, // no timeout
FALSE); // not alertable
if (!ntapi::ntstatus_success(ntstat)) {
std::terminate();
}
for (std::uint32_t i = 0; i < numEntriesRemoved; ++i) {
auto& entry = completionBuffer[i];
if (entry.ApcContext != nullptr) {
ntapi::IO_STATUS_BLOCK* iosb =
reinterpret_cast<ntapi::IO_STATUS_BLOCK*>(entry.ApcContext);
auto* state = to_io_state(iosb);
UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
--state->pendingCompletionNotifications;
if (state->pendingCompletionNotifications == 0) {
--remaining;
}
}
}
} while (remaining > 0);
}
}
void low_latency_iocp_context::run_impl(bool& stopFlag) {
ntapi::LARGE_INTEGER zero;
zero.QuadPart = 0;
const ntapi::PLARGE_INTEGER zeroTimeout = &zero;
const ntapi::PLARGE_INTEGER infiniteTimeout = nullptr;
[[maybe_unused]] auto prevId = activeThreadId_.exchange(
std::this_thread::get_id(), std::memory_order_relaxed);
UNIFEX_ASSERT(prevId == std::thread::id());
scope_guard resetActiveThreadIdOnExit = [&]() noexcept {
activeThreadId_.store(std::thread::id(), std::memory_order_relaxed);
};
constexpr std::uint32_t completionBufferSize = 128;
ntapi::FILE_IO_COMPLETION_INFORMATION
completionBuffer[completionBufferSize];
std::memset(&completionBuffer, 0, sizeof(completionBuffer));
bool shouldCheckRemoteQueue = true;
// Make sure that we leave the remote queue in a
// state such that the 'shouldCheckRemoteQueue = true'
// is a valid initial state for the next caller to run().
scope_guard ensureRemoteQueueActiveOnExit = [&]() noexcept {
if (!shouldCheckRemoteQueue) {
(void)remoteQueue_.try_mark_active();
}
};
while (!stopFlag) {
process_ready_queue:
// Process/drain all ready-to-run work.
while (!readyQueue_.empty()) {
operation_base* task = readyQueue_.pop_front();
task->callback(task);
}
// Check whether the stop request was processed
// when draining the ready queue.
if (stopFlag) {
break;
}
// Check the poll-queue for I/O operations that might have completed
// already but where we have not yet seen the IOCP event.
while (!pollQueue_.empty()) {
io_operation* op = static_cast<io_operation*>(pollQueue_.pop_front());
auto* state = op->ioState;
UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
UNIFEX_ASSERT(!state->completed);
if (poll_is_complete(*state)) {
// Completed before we received any notifications via IOCP.
// Schedule it to resume now, before polling any other I/Os
// to give those other I/Os time to complete.
state->completed = true;
schedule_local(state->parent);
goto process_ready_queue;
}
}
process_remote_queue:
if (shouldCheckRemoteQueue) {
if (try_dequeue_remote_work()) {
goto process_ready_queue;
}
}
// Now check if there are any IOCP events.
get_iocp_entries:
const ntapi::BOOLEAN alertable = FALSE;
ntapi::ULONG completionEntriesRetrieved = 0;
ntapi::NTSTATUS ntstat = ntapi::NtRemoveIoCompletionEx(
iocp_.get(),
completionBuffer,
completionBufferSize,
&completionEntriesRetrieved,
shouldCheckRemoteQueue ? zeroTimeout : infiniteTimeout,
alertable);
if (ntstat == STATUS_TIMEOUT) {
UNIFEX_ASSERT(shouldCheckRemoteQueue);
// Previous call was non-blocking.
// About to transition to blocking-call, but first need to
// mark remote queue inactive so that remote threads know
// they need to post an event to wake this thread up.
if (remoteQueue_.try_mark_inactive()) {
shouldCheckRemoteQueue = false;
goto get_iocp_entries;
} else {
goto process_remote_queue;
}
} else if (!ntapi::ntstatus_success(ntstat)) {
DWORD errorCode = ntapi::RtlNtStatusToDosError(ntstat);
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"NtRemoveIoCompletionEx()"});
}
// Process completion-entries we received from the OS.
for (ULONG i = 0; i < completionEntriesRetrieved; ++i) {
ntapi::FILE_IO_COMPLETION_INFORMATION& entry = completionBuffer[i];
if (entry.ApcContext != nullptr) {
ntapi::IO_STATUS_BLOCK* iosb =
reinterpret_cast<ntapi::IO_STATUS_BLOCK*>(entry.ApcContext);
// TODO: Do we need to store the error-code/bytes-transferred here?
// Is entry.IoStatusBlock just a copy of what is already in 'iosb'.
// Should we be copying entry.IoStatusBlock to '*iosb'?
UNIFEX_ASSERT(iosb->Status != STATUS_PENDING);
vectored_io_state* state = to_io_state(iosb);
UNIFEX_ASSERT(state->pendingCompletionNotifications > 0);
if (--state->pendingCompletionNotifications == 0) {
// This was the last pending notification for this state.
if (state->parent != nullptr) {
// An operation is still attached to this state.
// Notify it if we hadn't already done so.
if (!state->completed) {
state->completed = true;
schedule_local(state->parent);
}
} else {
// Otherwise the operation has already detached
// from this I/O state and so we can immediately
// return it to the pool.
release_io_state(state);
}
}
} else {
// A remote-thread wake-up notification.
shouldCheckRemoteQueue = true;
}
}
}
}
bool low_latency_iocp_context::try_dequeue_remote_work() noexcept {
auto items = remoteQueue_.dequeue_all_reversed();
if (items.empty()) {
return false;
}
// Note that this ends up enqueueing entries in reverse.
// TODO: Modify this so it enqueues items in a way that
// preserves order.
do {
schedule_local(items.pop_front());
} while (!items.empty());
return true;
}
bool low_latency_iocp_context::poll_is_complete(
vectored_io_state& state) noexcept {
// TODO: Double check the memory barriers/atomics we need to use
// here to ensure we perform this read, that it doesn't tear and
// that it has the appropriate memory semantics.
for (std::size_t i = 0; i < state.operationCount; ++i) {
// Mark volatile to indicate to the compiler that it might change in the
// background. The kernel might update it as the I/O completes.
volatile ntapi::IO_STATUS_BLOCK* iosb = &state.operations[i];
if (iosb->Status == STATUS_PENDING) {
return false;
}
}
// Make sure that we have visibility of the results of the I/O operations
// (assuming that the OS used a "release" memory semantic when writing
// to the 'Status' field).
std::atomic_thread_fence(std::memory_order_acquire);
return true;
}
low_latency_iocp_context::vectored_io_state*
low_latency_iocp_context::to_io_state(ntapi::IO_STATUS_BLOCK* iosb) noexcept {
vectored_io_state* const pool = ioPool_.get();
std::ptrdiff_t offset =
reinterpret_cast<char*>(iosb) - reinterpret_cast<char*>(pool);
UNIFEX_ASSERT(offset >= 0);
std::ptrdiff_t index = offset / sizeof(vectored_io_state);
UNIFEX_ASSERT(index < static_cast<std::ptrdiff_t>(ioPoolSize_));
return &pool[index];
}
void low_latency_iocp_context::schedule(operation_base* op) noexcept {
if (is_running_on_io_thread()) {
schedule_local(op);
} else {
schedule_remote(op);
}
}
void low_latency_iocp_context::schedule_local(operation_base* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
readyQueue_.push_back(op);
}
void low_latency_iocp_context::schedule_remote(operation_base* op) noexcept {
UNIFEX_ASSERT(!is_running_on_io_thread());
if (remoteQueue_.enqueue(op)) {
// I/O thread is potentially sleeping.
// Post a wake-up NOP event to the queue.
// BUGBUG: This could potentially fail and if it does then
// the I/O thread might never respond to remote enqueues.
// For now we just treat this as a (hopefully rare) unrecoverable error.
ntapi::NTSTATUS ntstat = ntapi::NtSetIoCompletion(
iocp_.get(),
0, // KeyContext
nullptr, // ApcContext
0, // NTSTATUS (0 = success)
0); // IoStatusInformation
if (!ntapi::ntstatus_success(ntstat)) {
// Failed to post an event to the I/O completion port.
std::terminate();
}
}
}
bool low_latency_iocp_context::try_allocate_io_state_for(
io_operation* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
if (ioFreeList_.empty()) {
return false;
}
UNIFEX_ASSERT(pendingIoQueue_.empty());
// An operation is already available
auto* state = ioFreeList_.pop_front();
state->parent = op;
state->completed = false;
state->operationCount = 0;
state->pendingCompletionNotifications = 0;
op->ioState = state;
return true;
}
void low_latency_iocp_context::schedule_when_io_state_available(
io_operation* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
UNIFEX_ASSERT(ioFreeList_.empty());
pendingIoQueue_.push_back(op);
}
void low_latency_iocp_context::release_io_state(
vectored_io_state* state) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
if (state->pendingCompletionNotifications == 0) {
// Can be freed immediately.
if (pendingIoQueue_.empty()) {
ioFreeList_.push_front(state);
} else {
UNIFEX_ASSERT(ioFreeList_.empty());
// Another operation was waiting for an I/O state.
// Give the I/O state directly to the operation instead
// of adding it to the free-list. This prevents some other
// operation from skipping the queue and acquiring it
// before this I/O operation can run.
auto* op = static_cast<io_operation*>(pendingIoQueue_.pop_front());
state->parent = op;
state->completed = false;
state->operationCount = 0;
state->pendingCompletionNotifications = 0;
op->ioState = state;
schedule_local(op);
}
} else {
// Mark it to be freed once all of its I/O completion
// notifications have been received.
state->parent = nullptr;
}
}
void low_latency_iocp_context::schedule_poll_io(io_operation* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
UNIFEX_ASSERT(op != nullptr);
UNIFEX_ASSERT(op->ioState != nullptr);
if (op->ioState->pendingCompletionNotifications > 0) {
pollQueue_.push_back(op);
} else {
// No need to poll, schedule straight to the front of the ready-to-run
// queue.
readyQueue_.push_front(op);
}
}
void low_latency_iocp_context::associate_file_handle(handle_t fileHandle) {
const HANDLE result =
::CreateIoCompletionPort(fileHandle, iocp_.get(), 0, 0);
if (result == nullptr) {
DWORD errorCode = ::GetLastError();
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"CreateIoCompletionPort"});
}
const BOOL ok = ::SetFileCompletionNotificationModes(
fileHandle,
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE);
if (!ok) {
DWORD errorCode = ::GetLastError();
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"SetFileCompletionNotificationModes"});
}
}
void low_latency_iocp_context::io_operation::cancel_io() noexcept {
UNIFEX_ASSERT(ioState != nullptr);
// Cancel operations in reverse order so that later operations
// are cancelled first and don't accidentally end up with earlier
// operations being cancelled and later ones completing due to
// a race.
for (std::uint16_t i = ioState->operationCount; i != 0; --i) {
ntapi::IO_STATUS_BLOCK* iosb = &ioState->operations[i - 1];
ntapi::IO_STATUS_BLOCK ioStatus;
[[maybe_unused]] ntapi::NTSTATUS ntstat =
ntapi::NtCancelIoFileEx(fileHandle, iosb, &ioStatus);
// TODO: Check ntstat for failure.
// Can't really do much here even if there is failure, anyway.
}
}
bool low_latency_iocp_context::io_operation::is_complete() noexcept {
UNIFEX_ASSERT(context.is_running_on_io_thread());
UNIFEX_ASSERT(ioState != nullptr);
if (ioState->pendingCompletionNotifications == 0) {
return true;
}
for (std::size_t i = 0; i < ioState->operationCount; ++i) {
volatile ntapi::IO_STATUS_BLOCK* iosb = &ioState->operations[i];
// Mark volatile to indicate to the compiler that it might change in the
// background. The kernel might update it as the I/O completes.
if (iosb->Status == STATUS_PENDING) {
return false;
}
}
// Make sure that we have visibility of the results of all of the I/O
// operations.
// Assuming that the OS used a "release" memory semantic when writing
// to the 'Status' field here.
std::atomic_thread_fence(std::memory_order_acquire);
return true;
}
bool low_latency_iocp_context::io_operation::start_read(
span<std::byte> buffer) noexcept {
UNIFEX_ASSERT(context.is_running_on_io_thread());
UNIFEX_ASSERT(ioState != nullptr);
UNIFEX_ASSERT(ioState->operationCount < max_vectored_io_size);
std::size_t offset = 0;
while (offset < buffer.size()) {
ntapi::IO_STATUS_BLOCK& iosb =
ioState->operations[ioState->operationCount];
++ioState->operationCount;
iosb.Status = STATUS_PENDING;
iosb.Information = 0;
// Truncate over-large chunks to a number of bytes that will still
// preserve alignment requirements of the underlying device if this
// happens to be an unbuffered storage device.
// In this case we truncate to the largest multiple of 64k less than
// 2^32 to allow for up to 64k alignment.
// TODO: Ideally we'd just use the underlying alignment of the
// file-handle.
static constexpr std::size_t truncatedChunkSize = 0xFFFF0000u;
static constexpr std::size_t maxChunkSize = 0xFFFFFFFFu;
std::size_t chunkSize = buffer.size() - offset;
if (chunkSize > maxChunkSize) {
chunkSize = truncatedChunkSize;
}
ntapi::NTSTATUS status = ntapi::NtReadFile(
fileHandle,
NULL, // Event
NULL, // ApcRoutine
&iosb, // ApcContext
&iosb, // IoStatusBlock
buffer.data() + offset, // Buffer
static_cast<ntapi::ULONG>(chunkSize), // Length
nullptr, // ByteOffset
nullptr); // Key
if (status == STATUS_PENDING) {
++ioState->pendingCompletionNotifications;
} else if (ntapi::ntstatus_success(status)) {
// Succeeded synchronously.
if (!skipNotificationOnSuccess) {
++ioState->pendingCompletionNotifications;
}
} else {
// Immediate failure.
// Don't launch any more operations.
// TODO: Should we cancel any prior launched operations?
return false;
}
if (ioState->operationCount == max_vectored_io_size) {
// We've launched as many operations as we can.
return false;
}
offset += chunkSize;
}
return true;
}
bool low_latency_iocp_context::io_operation::start_write(
span<const std::byte> buffer) noexcept {
UNIFEX_ASSERT(context.is_running_on_io_thread());
UNIFEX_ASSERT(ioState != nullptr);
UNIFEX_ASSERT(ioState->operationCount < max_vectored_io_size);
std::size_t offset = 0;
while (offset < buffer.size()) {
ntapi::IO_STATUS_BLOCK& iosb =
ioState->operations[ioState->operationCount];
++ioState->operationCount;
iosb.Status = STATUS_PENDING;
iosb.Information = 0;
// Truncate over-large chunks to a number of bytes that will still
// preserve alignment requirements of the underlying device if this
// happens to be an unbuffered storage device.
// In this case we truncate to the largest multiple of 64k less than
// 2^32 to allow for up to 64k alignment.
// TODO: Ideally we'd just use the underlying alignment of the
// file-handle.
static constexpr std::size_t truncatedChunkSize = 0xFFFF0000u;
static constexpr std::size_t maxChunkSize = 0xFFFFFFFFu;
std::size_t chunkSize = buffer.size() - offset;
if (chunkSize > maxChunkSize) {
chunkSize = truncatedChunkSize;
}
ntapi::NTSTATUS status = ntapi::NtWriteFile(
fileHandle,
NULL, // Event
NULL, // ApcRoutine
&iosb, // ApcContext
&iosb, // IoStatusBlock
const_cast<std::byte*>(buffer.data()) + offset, // Buffer
static_cast<ntapi::ULONG>(chunkSize), // Length
nullptr, // ByteOffset
nullptr); // Key
if (status == STATUS_PENDING) {
++ioState->pendingCompletionNotifications;
} else if (ntapi::ntstatus_success(status)) {
// Succeeded synchronously.
if (!skipNotificationOnSuccess) {
++ioState->pendingCompletionNotifications;
}
} else {
// Immediate failure.
// Don't launch any more operations.
// TODO: Should we cancel any prior launched operations?
return false;
}
if (ioState->operationCount == max_vectored_io_size) {
return false;
}
offset += chunkSize;
}
return true;
}
std::size_t low_latency_iocp_context::io_operation::get_result(
std::error_code& ec) noexcept {
UNIFEX_ASSERT(context.is_running_on_io_thread());
UNIFEX_ASSERT(ioState != nullptr);
UNIFEX_ASSERT(ioState->completed);
ec = std::error_code{};
std::size_t totalBytesTransferred = 0;
for (std::size_t i = 0; i < ioState->operationCount; ++i) {
const ntapi::IO_STATUS_BLOCK& iosb = ioState->operations[i];
UNIFEX_ASSERT(iosb.Status != STATUS_PENDING);
totalBytesTransferred += iosb.Information;
if (!ntapi::ntstatus_success(iosb.Status)) {
ntapi::ULONG errorCode = ntapi::RtlNtStatusToDosError(iosb.Status);
ec = std::error_code(
static_cast<int>(errorCode), std::system_category());
break;
}
}
return totalBytesTransferred;
}
std::tuple<
low_latency_iocp_context::readable_byte_stream,
low_latency_iocp_context::writable_byte_stream>
low_latency_iocp_context::scheduler::open_pipe_impl(
low_latency_iocp_context& context) {
std::mt19937_64 rand{::GetTickCount64() + ::GetCurrentThreadId()};
safe_handle serverHandle;
auto toHex = [](std::uint8_t value) noexcept -> char {
return value < 10 ? char('0' + value) : char('a' - 10 + value);
};
char pipeName[10 + 16] = {'\\', '\\', '.', '\\', 'p', 'i', 'p', 'e', '\\'};
// Try to create the server side of a new named pipe
// Use a randomly generated name to ensure uniqueness.
const DWORD pipeBufferSize = 16 * 1024;
const int maxAttempts = 100;
for (int attempt = 1;; ++attempt) {
const std::uint64_t randomNumber = rand();
for (int i = 0; i < 16; ++i) {
pipeName[9 + i] = toHex((randomNumber >> (4 * i)) & 0xFu);
}
pipeName[25] = '\0';
HANDLE pipe = ::CreateNamedPipeA(
pipeName,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED |
FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS |
PIPE_WAIT,
1, // nMaxInstances
0, // nOutBufferSize
pipeBufferSize, // nInBufferSize
0, // nDefaultTimeout
nullptr); // lpSecurityAttributes
if (pipe == INVALID_HANDLE_VALUE) {
DWORD errorCode = ::GetLastError();
if (errorCode == ERROR_ALREADY_EXISTS && attempt < maxAttempts) {
// Try again with a different random name
continue;
}
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"open_pipe: CreateNamedPipe"});
}
serverHandle = safe_handle{pipe};
break;
}
// Open the client-side of this named-pipe
safe_handle clientHandle{::CreateFileA(
pipeName,
GENERIC_WRITE,
0, // dwShareMode
nullptr, // lpSecurityAttributes
OPEN_EXISTING, // dwCreationDisposition
FILE_FLAG_OVERLAPPED,
nullptr)};
if (clientHandle == INVALID_HANDLE_VALUE) {
DWORD errorCode = ::GetLastError();
throw_(std::system_error{
static_cast<int>(errorCode),
std::system_category(),
"open_pipe: CreateFile"});
}
context.associate_file_handle(serverHandle.get());
context.associate_file_handle(clientHandle.get());
return {
readable_byte_stream{context, std::move(serverHandle)},
writable_byte_stream{context, std::move(clientHandle)}};
}
} // namespace unifex::win32