include/unifex/win32/low_latency_iocp_context.hpp (585 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/io_concepts.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/pipe_concepts.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/span.hpp>
#include <unifex/std_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_list.hpp>
#include <unifex/detail/intrusive_stack.hpp>
#include <unifex/win32/detail/ntapi.hpp>
#include <unifex/win32/detail/safe_handle.hpp>
#include <unifex/win32/detail/types.hpp>
#include <array>
#include <cstdint>
#include <system_error>
#include <thread>
#include <tuple>
#include <unifex/detail/prologue.hpp>
namespace unifex::win32
{
class low_latency_iocp_context {
class scheduler;
class readable_byte_stream;
class writable_byte_stream;
class schedule_sender;
template <typename Receiver>
struct _schedule_op {
class type;
};
template <typename Receiver>
using schedule_op = typename _schedule_op<Receiver>::type;
template <typename Buffer>
struct _read_file_sender {
class type;
};
template <typename Buffer>
using read_file_sender = typename _read_file_sender<Buffer>::type;
template <typename Buffer>
struct _write_file_sender {
class type;
};
template <typename Buffer>
using write_file_sender = typename _write_file_sender<Buffer>::type;
template <typename Buffer, typename Receiver>
struct _read_file_op {
class type;
};
template <typename Buffer, typename Receiver>
using read_file_op = typename _read_file_op<Buffer, Receiver>::type;
template <typename Buffer, typename Receiver>
struct _write_file_op {
class type;
};
template <typename Buffer, typename Receiver>
using write_file_op = typename _write_file_op<Buffer, Receiver>::type;
public:
// Initialise the IOCP context and pre-allocate storage for I/O
// state needed to support at most 'maxIoOperations' concurrent I/O
// operations.
explicit low_latency_iocp_context(std::size_t maxIoOperations);
low_latency_iocp_context(low_latency_iocp_context&&) = delete;
low_latency_iocp_context(const low_latency_iocp_context&) = delete;
low_latency_iocp_context&
operator=(const low_latency_iocp_context&) = delete;
low_latency_iocp_context& operator=(low_latency_iocp_context&&) = delete;
~low_latency_iocp_context();
// Drive the event loop until a request to stop is communicated via
// the passed StopToken.
template <typename StopToken>
void run(StopToken st);
// Obtain a handle to this execution context that can be used to
// schedule work and open I/O resources.
scheduler get_scheduler() noexcept;
private:
struct vectored_io_state;
// This value chosen so that vectored_io_state is 512 bytes on 64-bit
// architectures and 256 bytes on 32-bit architectures.
static constexpr std::size_t max_vectored_io_size = 30;
struct operation_base {
explicit operation_base(low_latency_iocp_context& ctx) noexcept
: context(ctx) {}
using callback_t = void(operation_base*) noexcept;
low_latency_iocp_context& context;
callback_t* callback = nullptr;
operation_base* next = nullptr;
operation_base* prev = nullptr;
};
using operation_queue = intrusive_list<
operation_base,
&operation_base::next,
&operation_base::prev>;
struct io_operation : operation_base {
explicit io_operation(
low_latency_iocp_context& context,
handle_t fileHandle,
bool skipNotificationOnSuccess) noexcept
: operation_base(context)
, fileHandle(fileHandle)
, skipNotificationOnSuccess(skipNotificationOnSuccess)
, ioState(nullptr) {}
const handle_t fileHandle;
const bool skipNotificationOnSuccess;
vectored_io_state* ioState;
// Cancel outstanding I/O operations (if any)
void cancel_io() noexcept;
// Poll for whether or not the operation has completed.
// Returns 'true' if the operation is completed (in which case the
// operation has 'acquire' semantics), 'false' otherwise.
bool is_complete() noexcept;
// Start reading the next 'buffer.size()' bytes into 'buffer'.
//
// Returns 'true' if a additional read-operations can be submitted.
// This will only be the case if all of the following are true:
// - a read of the entirety of 'buffer' was successfully started
// - we haven't reached the maximum number of I/O operations in
// this batch.
//
// Once all I/O operations have been started, the caller should
// schedule a 'poll' of this operation by setting this->callback
// and calling context.schedule_poll(this). This will schedule the
// callback to be run immediately (if it completed synchronously).
bool start_read(span<std::byte> buffer) noexcept;
// Start writing the context of 'buffer' to fileHandle.
bool start_write(span<const std::byte> buffer) noexcept;
std::size_t get_result(std::error_code& ec) noexcept;
};
struct vectored_io_state {
// Parent operation.
//
// May be nullptr if this operation already completed due to a poll.
// If this is the case then when all pending completion notifications
// are received then this will just go straight back on the free-list.
io_operation* parent = nullptr;
// Intrusive list ptr.
vectored_io_state* next = nullptr;
vectored_io_state* prev = nullptr;
// Total number of operations started
std::uint8_t operationCount = 0;
// Number of operations not yet received completion-notification
// via the IOCP. The vectored_io_state structure is not free to
// be reused until this number reaches zero.
std::uint8_t pendingCompletionNotifications = 0;
// Whether or not the 'parent' has already been notified of completion.
bool completed = false;
ntapi::IO_STATUS_BLOCK operations[max_vectored_io_size];
};
struct stop_operation : operation_base {
explicit stop_operation(low_latency_iocp_context& ctx) noexcept
: operation_base(ctx) {
this->callback = &request_stop_callback;
}
~stop_operation() {
if (isEnqueued) {
// Flush any items in the remote-queue into the ready queue
// just in case this operation is still in the remote queue.
(void)context.try_dequeue_remote_work();
// This operation should now be in the ready queue, remove it.
context.readyQueue_.remove(this);
}
}
void start() noexcept {
if (context.is_running_on_io_thread()) {
stopRequestedFlag = true;
} else {
isEnqueued = true;
context.schedule_remote(this);
}
}
static void request_stop_callback(operation_base* op) noexcept {
auto& self = *static_cast<stop_operation*>(op);
self.stopRequestedFlag = true;
self.isEnqueued = false;
}
bool stopRequestedFlag = false;
private:
bool isEnqueued = false;
};
struct stop_callback {
stop_operation& op;
void operator()() noexcept { op.start(); }
};
void run_impl(bool& stopFlag);
// Dequeue items from remote queue and move them to the
// ready-to-run queue.
// Returns true if any items were dequeued.
bool try_dequeue_remote_work() noexcept;
bool poll_is_complete(vectored_io_state& state) noexcept;
// Obtain the I/O state that contains a given io_status_block structure.
vectored_io_state* to_io_state(ntapi::IO_STATUS_BLOCK* io) noexcept;
bool is_running_on_io_thread() const noexcept {
return activeThreadId_.load(std::memory_order_relaxed) ==
std::this_thread::get_id();
}
void schedule(operation_base* op) noexcept;
void schedule_local(operation_base* op) noexcept;
void schedule_remote(operation_base* op) noexcept;
// If an I/O state is available, attaches an unused I/O state to
// op->ioState and returns true, otherwise returns false if no
// I/O state is currently available.
//
// If 'true' is returned then the caller can populate op->ioState
// and initiate the I/O using its IO_STATUS_BLOCK structures.
[[nodiscard]] bool try_allocate_io_state_for(io_operation* op) noexcept;
// Schedule the specified 'op' to be called back (calling op->callback)
// when an I/O state becomes available and has been allocated to 'op'.
//
// Do this only after an unsuccessful call to try_allocate_io_state_for()
// asynchronously wait until some other I/O operation completes and frees
// up an other I/O state.
void schedule_when_io_state_available(io_operation* op) noexcept;
// Mark the io state as released and let it return to the pool.
void release_io_state(vectored_io_state* state) noexcept;
// Schedule this operation to be polled next time we run out of work
// in the ready-queue before going back to the OS for completion-events.
void schedule_poll_io(io_operation* op) noexcept;
// Attempt to associate the specified file-handle with this I/O context
// so that its I/O completion events are posted to this context's IOCP.
void associate_file_handle(handle_t fileHandle);
private:
////
// State that won't change after initialisation or that change rarely
// e.g. only on each call to run().
std::atomic<std::thread::id> activeThreadId_;
safe_handle iocp_;
std::size_t ioPoolSize_;
std::unique_ptr<vectored_io_state[]> ioPool_;
/////
// State that is accessed/modified by the I/O thread only.
alignas(64) intrusive_stack<
vectored_io_state,
&vectored_io_state::next> ioFreeList_;
// Newly launched operations that we want to poll for completion as soon as
// we run out of 'ready-to-run' work to do.
operation_queue pollQueue_;
// Operations waiting to acquire a vectored_io_operation.
// These will be resumed in-order as vectored_io_operations are returned to
// the pool.
operation_queue pendingIoQueue_;
// Operations that are ready to run.
operation_queue readyQueue_;
/////
// State that may be accessed/modified by other threads.
alignas(64) atomic_intrusive_queue<
operation_base,
&operation_base::next> remoteQueue_;
};
template <typename StopToken>
void low_latency_iocp_context::run(StopToken stopToken) {
stop_operation op{*this};
typename StopToken::template callback_type<stop_callback> cb{
std::move(stopToken), stop_callback{op}};
run_impl(op.stopRequestedFlag);
}
///////////////////////////
// schedule
template <typename Receiver>
class low_latency_iocp_context::_schedule_op<Receiver>::type
: private low_latency_iocp_context::operation_base {
public:
template <typename Receiver2>
explicit type(low_latency_iocp_context& context, Receiver2&& r)
: operation_base(context)
, receiver_((Receiver2 &&) r) {
this->callback = &execute_callback;
}
void start() & noexcept { this->context.schedule(this); }
private:
static void execute_callback(operation_base* op) noexcept {
auto& self = *static_cast<type*>(op);
if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) {
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
return;
}
}
if constexpr (is_nothrow_callable_v<
decltype(unifex::set_value),
Receiver>) {
unifex::set_value(std::move(self.receiver_));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_));
} UNIFEX_CATCH (...) {
unifex::set_error(
std::move(self.receiver_), std::current_exception());
}
}
}
Receiver receiver_;
};
class low_latency_iocp_context::schedule_sender {
public:
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
explicit schedule_sender(low_latency_iocp_context& context) noexcept
: context_(context) {}
template(typename Receiver)(requires receiver_of<Receiver>) friend auto tag_invoke(
tag_t<connect>,
const schedule_sender& s,
Receiver&& r) noexcept(std::
is_nothrow_constructible_v<
remove_cvref_t<Receiver>,
Receiver>)
-> schedule_op<remove_cvref_t<Receiver>> {
return schedule_op<remove_cvref_t<Receiver>>{s.context_, (Receiver &&) r};
}
private:
template <typename Receiver>
using schedule_op = schedule_op<Receiver>;
low_latency_iocp_context& context_;
};
///////////////////////////
// read_file
template <typename Buffer, typename Receiver>
class low_latency_iocp_context::_read_file_op<Buffer, Receiver>::type
: private low_latency_iocp_context::io_operation {
public:
template <typename Receiver2, typename Buffer2>
explicit type(
low_latency_iocp_context& ctx,
handle_t fileHandle,
bool skipNotificationOnSuccess,
Buffer2&& buffer,
Receiver2&& r)
: io_operation(ctx, fileHandle, skipNotificationOnSuccess)
, receiver_((Receiver2 &&) r)
, buffer_((Buffer2 &&) buffer) {}
void start() & noexcept {
if (this->context.is_running_on_io_thread()) {
acquire_io_state(this);
} else {
this->callback = &acquire_io_state;
this->context.schedule_remote(this);
}
}
private:
struct cancel_callback {
type& op;
void operator()() noexcept { op.cancel_io(); }
};
static void acquire_io_state(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
if (self.context.try_allocate_io_state_for(&self)) {
start_io(op);
} else {
// TODO: Add support for cancellation while waiting in the
// pendingIoQueue_.
self.callback = &start_io;
self.context.schedule_when_io_state_available(&self);
}
}
static void start_io(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
UNIFEX_ASSERT(self.context.is_running_on_io_thread());
// TODO: Add support for a BufferSequence concept to allow
// vectorised I/O here.
// For now, just assume that 'Buffer' is convertible to a
// 'span<std::byte>'.
self.start_read(self.buffer_);
if (self.ioState->pendingCompletionNotifications == 0) {
self.ioState->completed = true;
self.callback = &on_complete;
self.context.readyQueue_.push_front(op);
} else {
if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) {
self.stopCallback_.construct(
get_stop_token(self.receiver_), cancel_callback{self});
self.callback = &on_cancellable_complete;
} else {
self.callback = &on_complete;
}
self.context.schedule_poll_io(&self);
}
}
static void on_cancellable_complete(operation_base* op) noexcept {
auto& self = *static_cast<type*>(op);
UNIFEX_ASSERT(self.context.is_running_on_io_thread());
self.stopCallback_.destruct();
on_complete(op);
}
static void on_complete(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
std::error_code ec;
std::size_t totalBytesTransferred = self.get_result(ec);
self.context.release_io_state(self.ioState);
// Treat partial failure as a success case.
// TODO: Should we be sending tuple of (bytesTransferred, ec) here
// instead?
if (!ec || totalBytesTransferred > 0) {
if constexpr (is_nothrow_callable_v<
decltype(set_value),
Receiver,
std::size_t>) {
unifex::set_value(std::move(self.receiver_), totalBytesTransferred);
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), totalBytesTransferred);
} UNIFEX_CATCH (...) {
unifex::set_error(
std::move(self.receiver_), std::current_exception());
}
}
} else if (ec == std::errc::operation_canceled) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(std::move(self.receiver_), std::move(ec));
}
}
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
UNIFEX_NO_UNIQUE_ADDRESS Buffer buffer_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
};
template <typename Buffer>
class low_latency_iocp_context::_read_file_sender<Buffer>::type {
public:
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<std::size_t>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr, std::error_code>;
static constexpr bool sends_done = true;
template <typename Buffer2>
explicit type(
low_latency_iocp_context& context,
handle_t fileHandle,
bool skipNotificationsOnSuccess,
Buffer2&& buffer)
: context_(context)
, fileHandle_(fileHandle)
, skipNotificationsOnSuccess_(skipNotificationsOnSuccess)
, buffer_((Buffer2 &&) buffer) {}
template(typename Receiver)(requires receiver_of<Receiver, std::size_t>) friend auto tag_invoke(
tag_t<unifex::connect>,
type&& self,
Receiver&& r) noexcept(std::is_nothrow_move_constructible_v<Buffer>&&
std::is_nothrow_constructible_v<
remove_cvref_t<Receiver>,
Receiver>)
-> read_file_op<Buffer, remove_cvref_t<Receiver>> {
return read_file_op<Buffer, remove_cvref_t<Receiver>>{
self.context_,
self.fileHandle_,
self.skipNotificationsOnSuccess_,
std::move(self.buffer_),
(Receiver &&) r};
}
private:
template <typename Buf, typename Receiver>
using read_file_op = read_file_op<Buf, Receiver>;
low_latency_iocp_context& context_;
handle_t fileHandle_;
bool skipNotificationsOnSuccess_;
Buffer buffer_;
};
//////////////////////////
// write_file
template <typename Buffer, typename Receiver>
class low_latency_iocp_context::_write_file_op<Buffer, Receiver>::type
: private low_latency_iocp_context::io_operation {
public:
template <typename Receiver2, typename Buffer2>
explicit type(
low_latency_iocp_context& ctx,
handle_t fileHandle,
bool skipNotificationOnSuccess,
Buffer2&& buffer,
Receiver2&& r)
: io_operation(ctx, fileHandle, skipNotificationOnSuccess)
, receiver_((Receiver2 &&) r)
, buffer_((Buffer2 &&) buffer) {}
void start() & noexcept {
if (this->context.is_running_on_io_thread()) {
acquire_io_state(this);
} else {
this->callback = &acquire_io_state;
this->context.schedule_remote(this);
}
}
private:
struct cancel_callback {
type& op;
void operator()() noexcept { op.cancel_io(); }
};
static void acquire_io_state(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
UNIFEX_ASSERT(self.context.is_running_on_io_thread());
if (self.context.try_allocate_io_state_for(&self)) {
start_io(op);
} else {
// TODO: Add support for cancellation while waiting in the
// pendingIoQueue_.
self.callback = &start_io;
self.context.schedule_when_io_state_available(&self);
}
}
static void start_io(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
UNIFEX_ASSERT(self.context.is_running_on_io_thread());
// TODO: Add support for a BufferSequence concept to allow
// vectorised I/O here.
// For now, just assume that 'Buffer' is convertible to a
// 'span<std::byte>'.
self.start_write(self.buffer_);
if (self.ioState->pendingCompletionNotifications == 0) {
self.ioState->completed = true;
self.callback = &on_complete;
self.context.readyQueue_.push_front(op);
} else {
if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) {
self.stopCallback_.construct(
get_stop_token(self.receiver_), cancel_callback{self});
self.callback = &on_cancellable_complete;
} else {
self.callback = &on_complete;
}
self.context.schedule_poll_io(&self);
}
}
static void on_cancellable_complete(operation_base* op) noexcept {
auto& self = *static_cast<type*>(op);
UNIFEX_ASSERT(self.context.is_running_on_io_thread());
self.stopCallback_.destruct();
on_complete(op);
}
static void on_complete(operation_base* op) noexcept {
type& self = *static_cast<type*>(op);
std::error_code ec;
std::size_t totalBytesTransferred = self.get_result(ec);
self.context.release_io_state(self.ioState);
// Treat partial failure as a success case.
// TODO: Should we be sending tuple of (bytesTransferred, ec) here
// instead?
if (!ec || totalBytesTransferred > 0) {
if constexpr (is_nothrow_callable_v<
decltype(set_value),
Receiver,
std::size_t>) {
unifex::set_value(std::move(self.receiver_), totalBytesTransferred);
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), totalBytesTransferred);
} UNIFEX_CATCH (...) {
unifex::set_error(
std::move(self.receiver_), std::current_exception());
}
}
} else if (ec == std::errc::operation_canceled) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(std::move(self.receiver_), std::move(ec));
}
}
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
UNIFEX_NO_UNIQUE_ADDRESS Buffer buffer_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
};
template <typename Buffer>
class low_latency_iocp_context::_write_file_sender<Buffer>::type {
public:
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<std::size_t>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr, std::error_code>;
static constexpr bool sends_done = true;
template <typename Buffer2>
explicit type(
low_latency_iocp_context& context,
handle_t fileHandle,
bool skipNotificationsOnSuccess,
Buffer2&& buffer)
: context_(context)
, fileHandle_(fileHandle)
, skipNotificationsOnSuccess_(skipNotificationsOnSuccess)
, buffer_((Buffer2 &&) buffer) {}
template(typename Receiver)(requires receiver_of<Receiver, std::size_t>) friend auto tag_invoke(
tag_t<unifex::connect>,
type&& self,
Receiver&& r) noexcept(std::is_nothrow_move_constructible_v<Buffer>&&
std::is_nothrow_constructible_v<
remove_cvref_t<Receiver>,
Receiver>)
-> write_file_op<Buffer, remove_cvref_t<Receiver>> {
return write_file_op<Buffer, remove_cvref_t<Receiver>>{
self.context_,
self.fileHandle_,
self.skipNotificationsOnSuccess_,
std::move(self.buffer_),
(Receiver &&) r};
}
private:
template <typename Buf, typename Receiver>
using write_file_op = write_file_op<Buf, Receiver>;
low_latency_iocp_context& context_;
handle_t fileHandle_;
bool skipNotificationsOnSuccess_;
Buffer buffer_;
};
class low_latency_iocp_context::readable_byte_stream {
public:
explicit readable_byte_stream(
low_latency_iocp_context& context, safe_handle fileHandle) noexcept
: context_(context)
, fileHandle_(std::move(fileHandle)) {}
template(typename Buffer)(requires convertible_to<Buffer, span<std::byte>>) friend read_file_sender<
remove_cvref_t<
Buffer>> tag_invoke(tag_t<async_read_some>, readable_byte_stream& stream, Buffer&& buffer) {
return read_file_sender<remove_cvref_t<Buffer>>{
stream.context_, stream.fileHandle_.get(), true, (Buffer &&) buffer};
}
private:
template <typename Buffer>
using read_file_sender = read_file_sender<Buffer>;
low_latency_iocp_context& context_;
safe_handle fileHandle_;
};
class low_latency_iocp_context::writable_byte_stream {
public:
explicit writable_byte_stream(
low_latency_iocp_context& context, safe_handle fileHandle) noexcept
: context_(context)
, fileHandle_(std::move(fileHandle)) {}
template(typename Buffer)(requires convertible_to<Buffer, span<const std::byte>>) friend write_file_sender<
remove_cvref_t<
Buffer>> tag_invoke(tag_t<async_write_some>, writable_byte_stream& stream, Buffer&& buffer) {
return write_file_sender<remove_cvref_t<Buffer>>{
stream.context_, stream.fileHandle_.get(), true, (Buffer &&) buffer};
}
private:
template <typename Buffer>
using write_file_sender = write_file_sender<Buffer>;
low_latency_iocp_context& context_;
safe_handle fileHandle_;
};
class low_latency_iocp_context::scheduler {
public:
explicit scheduler(low_latency_iocp_context& context) noexcept
: context_(&context) {}
schedule_sender schedule() const noexcept {
return schedule_sender{*context_};
}
friend std::tuple<readable_byte_stream, writable_byte_stream>
tag_invoke(tag_t<unifex::open_pipe>, scheduler s) {
return open_pipe_impl(*s.context_);
}
friend bool operator==(scheduler a, scheduler b) noexcept {
return a.context_ == b.context_;
}
friend bool operator!=(scheduler a, scheduler b) noexcept {
return !(a == b);
}
private:
static std::tuple<readable_byte_stream, writable_byte_stream>
open_pipe_impl(low_latency_iocp_context& ctx);
low_latency_iocp_context* context_;
};
inline low_latency_iocp_context::scheduler
low_latency_iocp_context::get_scheduler() noexcept {
return scheduler{*this};
}
} // namespace unifex::win32
#include <unifex/detail/epilogue.hpp>