include/unifex/linux/io_uring_context.hpp (688 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/config.hpp>
#if !UNIFEX_NO_LIBURING
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/file_concepts.hpp>
#include <unifex/filesystem.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/span.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/linux/mmap_region.hpp>
#include <unifex/linux/monotonic_clock.hpp>
#include <unifex/linux/safe_file_descriptor.hpp>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <optional>
#include <system_error>
#include <utility>
#include <liburing/io_uring.h>
#include <sys/uio.h>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace linuxos {
class io_uring_context {
public:
class schedule_sender;
class schedule_at_sender;
template <typename Duration>
class schedule_after_sender;
class read_sender;
class write_sender;
class async_read_only_file;
class async_read_write_file;
class async_write_only_file;
class scheduler;
io_uring_context();
~io_uring_context();
template <typename StopToken>
void run(StopToken stopToken);
scheduler get_scheduler() noexcept;
private:
struct operation_base {
operation_base() noexcept {}
operation_base* next_;
void (*execute_)(operation_base*) noexcept;
};
struct completion_base : operation_base {
int result_;
};
struct stop_operation : operation_base {
stop_operation() noexcept {
this->execute_ = [](operation_base * op) noexcept {
static_cast<stop_operation*>(op)->shouldStop_ = true;
};
}
bool shouldStop_ = false;
};
using time_point = linuxos::monotonic_clock::time_point;
struct schedule_at_operation : operation_base {
explicit schedule_at_operation(
io_uring_context& context,
const time_point& dueTime,
bool canBeCancelled) noexcept
: context_(context),
dueTime_(dueTime),
canBeCancelled_(canBeCancelled) {}
schedule_at_operation* timerNext_;
schedule_at_operation* timerPrev_;
io_uring_context& context_;
time_point dueTime_;
bool canBeCancelled_;
static constexpr std::uint32_t timer_elapsed_flag = 1;
static constexpr std::uint32_t cancel_pending_flag = 2;
std::atomic<std::uint32_t> state_ = 0;
};
using operation_queue =
intrusive_queue<operation_base, &operation_base::next_>;
using timer_heap = intrusive_heap<
schedule_at_operation,
&schedule_at_operation::timerNext_,
&schedule_at_operation::timerPrev_,
time_point,
&schedule_at_operation::dueTime_>;
bool is_running_on_io_thread() const noexcept;
void run_impl(const bool& shouldStop);
void schedule_impl(operation_base* op);
void schedule_local(operation_base* op) noexcept;
void schedule_local(operation_queue ops) noexcept;
void schedule_remote(operation_base* op) noexcept;
// Schedule some operation to be run when there is next available I/O slots.
void schedule_pending_io(operation_base* op) noexcept;
void reschedule_pending_io(operation_base* op) noexcept;
// Insert the timer operation into the queue of timers.
// Must be called from the I/O thread.
void schedule_at_impl(schedule_at_operation* op) noexcept;
// Execute all ready-to-run items on the local queue.
// Will not run other items that were enqueued during the execution of the
// items that were already enqueued.
// This bounds the amount of work to a finite amount.
void execute_pending_local() noexcept;
// Check if any completion queue items are available and if so add them
// to the local queue.
void acquire_completion_queue_items() noexcept;
// Check if any completion queue items have been enqueued and move them
// to the local queue.
void acquire_remote_queued_items() noexcept;
// Submit a request to the submission queue containing an IORING_OP_POLL_ADD
// for the remote queue eventfd as a way of registering for asynchronous
// notification of someone enqueueing
//
// Returns true if successful. If so then it is no longer permitted
// to call 'acquire_remote_queued_items()' until after the completion
// for this POLL_ADD operation is received.
//
// Returns false if either no more operations can be submitted at this
// time (submission queue full or too many pending completions) or if
// some other thread concurrently enqueued work to the remote queue.
bool try_register_remote_queue_notification() noexcept;
// Signal the remote queue eventfd.
//
// This should only be called after trying to enqueue() work
// to the remoteQueue and being told that the I/O thread is
// inactive.
void signal_remote_queue();
void remove_timer(schedule_at_operation* op) noexcept;
void update_timers() noexcept;
bool try_submit_timer_io(const time_point& dueTime) noexcept;
bool try_submit_timer_io_cancel() noexcept;
// Try to submit an entry to the submission queue
//
// If there is space in the queue then populateSqe
template <typename PopulateFn>
bool try_submit_io(PopulateFn populateSqe) noexcept;
// Total number of operations submitted that have not yet
// completed.
std::uint32_t pending_operation_count() const noexcept {
return cqPendingCount_ + sqUnflushedCount_;
}
// Query whether there is space in the submission ring buffer
// and space in the completion ring buffer for an additional
// entry.
bool can_submit_io() const noexcept {
return sqUnflushedCount_ < sqEntryCount_ &&
pending_operation_count() < cqEntryCount_;
}
std::uintptr_t timer_user_data() const {
return reinterpret_cast<std::uintptr_t>(&timers_);
}
std::uintptr_t remove_timer_user_data() const {
return reinterpret_cast<std::uintptr_t>(¤tDueTime_);
}
struct __kernel_timespec {
int64_t tv_sec;
long long tv_nsec;
};
////////
// Data that does not change once initialised.
// Submission queue state
std::uint32_t sqEntryCount_;
std::uint32_t sqMask_;
io_uring_sqe* sqEntries_;
unsigned* sqIndexArray_;
const std::atomic<unsigned>* sqHead_;
std::atomic<unsigned>* sqTail_;
std::atomic<unsigned>* sqFlags_;
std::atomic<unsigned>* sqDropped_;
// Completion queue state
std::uint32_t cqEntryCount_;
std::uint32_t cqMask_;
io_uring_cqe* cqEntries_;
std::atomic<unsigned>* cqHead_;
const std::atomic<unsigned>* cqTail_;
const std::atomic<unsigned>* cqOverflow_;
// Resources
safe_file_descriptor iouringFd_;
safe_file_descriptor remoteQueueEventFd_;
mmap_region cqMmap_;
mmap_region sqMmap_;
mmap_region sqeMmap_;
///////////////////
// Data that is modified by I/O thread
// Local queue for operations that are ready to execute.
operation_queue localQueue_;
// Operations that are waiting for more space in the I/O queues.
operation_queue pendingIoQueue_;
// Set of operations waiting to be executed at a specific time.
timer_heap timers_;
// The time that the current timer operation submitted to the kernel
// is due to elapse.
std::optional<time_point> currentDueTime_;
// Number of unflushed I/O submission entries.
std::uint32_t sqUnflushedCount_ = 0;
// Number of submitted operations that have not yet received a completion.
// We should ensure this number is never greater than cqEntryCount_ so that
// we don't end up with an overflowed completion queue.
std::uint32_t cqPendingCount_ = 0;
bool remoteQueueReadSubmitted_ = false;
bool timersAreDirty_ = false;
std::uint32_t activeTimerCount_ = 0;
__kernel_timespec time_;
//////////////////
// Data that is modified by remote threads
// Queue of operations enqueued by remote threads.
atomic_intrusive_queue<operation_base, &operation_base::next_> remoteQueue_;
};
template <typename StopToken>
void io_uring_context::run(StopToken stopToken) {
stop_operation stopOp;
auto onStopRequested = [&] { this->schedule_impl(&stopOp); };
typename StopToken::template callback_type<decltype(onStopRequested)>
stopCallback{std::move(stopToken), std::move(onStopRequested)};
run_impl(stopOp.shouldStop_);
}
template <typename PopulateFn>
bool io_uring_context::try_submit_io(PopulateFn populateSqe) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
if (pending_operation_count() < cqEntryCount_) {
// Haven't reached limit of completion-queue yet.
const auto tail = sqTail_->load(std::memory_order_relaxed);
const auto head = sqHead_->load(std::memory_order_acquire);
const auto usedCount = (tail - head);
UNIFEX_ASSERT(usedCount <= sqEntryCount_);
if (usedCount < sqEntryCount_) {
// There is space in the submission-queue.
const auto index = tail & sqMask_;
auto& sqe = sqEntries_[index];
static_assert(noexcept(populateSqe(sqe)));
// nullify the struct
std::memset(&sqe, 0, sizeof(sqe));
if constexpr (std::is_void_v<decltype(populateSqe(sqe))>) {
populateSqe(sqe);
} else {
if (!populateSqe(sqe)) {
return false;
}
}
sqIndexArray_[index] = index;
sqTail_->store(tail + 1, std::memory_order_release);
++sqUnflushedCount_;
return true;
}
}
return false;
}
class io_uring_context::schedule_sender {
template <typename Receiver>
class operation : private operation_base {
public:
void start() noexcept {
UNIFEX_TRY {
context_.schedule_impl(this);
} UNIFEX_CATCH (...) {
unifex::set_error(
static_cast<Receiver&&>(receiver_), std::current_exception());
}
}
private:
friend schedule_sender;
template <typename Receiver2>
explicit operation(io_uring_context& context, Receiver2&& r)
: context_(context), receiver_((Receiver2 &&) r) {
this->execute_ = &execute_impl;
}
static void execute_impl(operation_base* p) noexcept {
operation& op = *static_cast<operation*>(p);
if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) {
if (get_stop_token(op.receiver_).stop_requested()) {
unifex::set_done(static_cast<Receiver&&>(op.receiver_));
return;
}
}
if constexpr (noexcept(unifex::set_value(static_cast<Receiver&&>(op.receiver_)))) {
unifex::set_value(static_cast<Receiver&&>(op.receiver_));
} else {
UNIFEX_TRY {
unifex::set_value(static_cast<Receiver&&>(op.receiver_));
} UNIFEX_CATCH (...) {
unifex::set_error(static_cast<Receiver&&>(op.receiver_), std::current_exception());
}
}
}
io_uring_context& context_;
Receiver receiver_;
};
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
template <typename Receiver>
operation<std::remove_reference_t<Receiver>> connect(Receiver&& r) {
return operation<std::remove_reference_t<Receiver>>{context_,
(Receiver &&) r};
}
private:
friend io_uring_context::scheduler;
explicit schedule_sender(io_uring_context& context) noexcept
: context_(context) {}
io_uring_context& context_;
};
class io_uring_context::read_sender {
using offset_t = std::int64_t;
template <typename Receiver>
class operation : private completion_base {
friend io_uring_context;
public:
template <typename Receiver2>
explicit operation(const read_sender& sender, Receiver2&& r)
: context_(sender.context_),
fd_(sender.fd_),
offset_(sender.offset_),
receiver_((Receiver2 &&) r) {
buffer_[0].iov_base = sender.buffer_.data();
buffer_[0].iov_len = sender.buffer_.size();
}
void start() noexcept {
if (!context_.is_running_on_io_thread()) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(this);
} else {
start_io();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_io();
}
void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_READV;
sqe.fd = fd_;
sqe.off = offset_;
sqe.addr = reinterpret_cast<std::uintptr_t>(&buffer_[0]);
sqe.len = 1;
sqe.user_data = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));
this->execute_ = &operation::on_read_complete;
};
if (!context_.try_submit_io(populateSqe)) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_pending_io(this);
}
}
static void on_read_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(std::move(self.receiver_), ssize_t(self.result_)))) {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(self.receiver_), std::current_exception());
}
}
} else if (self.result_ == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-self.result_, std::system_category()});
}
}
io_uring_context& context_;
int fd_;
offset_t offset_;
iovec buffer_[1];
Receiver receiver_;
};
public:
// Produces number of bytes read.
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<ssize_t>>;
// Note: Only case it might complete with exception_ptr is if the
// receiver's set_value() exits with an exception.
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;
static constexpr bool sends_done = true;
explicit read_sender(
io_uring_context& context,
int fd,
offset_t offset,
span<std::byte> buffer) noexcept
: context_(context), fd_(fd), offset_(offset), buffer_(buffer) {}
template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) && {
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
}
private:
io_uring_context& context_;
int fd_;
offset_t offset_;
span<std::byte> buffer_;
};
class io_uring_context::write_sender {
using offset_t = std::int64_t;
template <typename Receiver>
class operation : private completion_base {
friend io_uring_context;
public:
template <typename Receiver2>
explicit operation(const write_sender& sender, Receiver2&& r)
: context_(sender.context_),
fd_(sender.fd_),
offset_(sender.offset_),
receiver_((Receiver2 &&) r) {
buffer_[0].iov_base = (void*)sender.buffer_.data();
buffer_[0].iov_len = sender.buffer_.size();
}
void start() noexcept {
if (!context_.is_running_on_io_thread()) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(this);
} else {
start_io();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_io();
}
void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto populateSqe = [this](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_WRITEV;
sqe.fd = fd_;
sqe.off = offset_;
sqe.addr = reinterpret_cast<std::uintptr_t>(&buffer_[0]);
sqe.len = 1;
sqe.user_data = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));
this->execute_ = &operation::on_write_complete;
};
if (!context_.try_submit_io(populateSqe)) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_pending_io(this);
}
}
static void on_write_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(std::move(self.receiver_), ssize_t(self.result_)))) {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), ssize_t(self.result_));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(self.receiver_), std::current_exception());
}
}
} else if (self.result_ == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-self.result_, std::system_category()});
}
}
io_uring_context& context_;
int fd_;
offset_t offset_;
iovec buffer_[1];
Receiver receiver_;
};
public:
// Produces number of bytes read.
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<ssize_t>>;
// Note: Only case it might complete with exception_ptr is if the
// receiver's set_value() exits with an exception.
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;
static constexpr bool sends_done = true;
explicit write_sender(
io_uring_context& context,
int fd,
offset_t offset,
span<const std::byte> buffer) noexcept
: context_(context), fd_(fd), offset_(offset), buffer_(buffer) {}
template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) {
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
}
private:
io_uring_context& context_;
int fd_;
offset_t offset_;
span<const std::byte> buffer_;
};
class io_uring_context::async_read_only_file {
public:
using offset_t = std::int64_t;
explicit async_read_only_file(io_uring_context& context, int fd) noexcept
: context_(context), fd_(fd) {}
private:
friend scheduler;
friend read_sender tag_invoke(
tag_t<async_read_some_at>,
async_read_only_file& file,
offset_t offset,
span<std::byte> buffer) noexcept {
return read_sender{file.context_, file.fd_.get(), offset, buffer};
}
io_uring_context& context_;
safe_file_descriptor fd_;
};
class io_uring_context::async_write_only_file {
public:
using offset_t = std::int64_t;
explicit async_write_only_file(io_uring_context& context, int fd) noexcept
: context_(context), fd_(fd) {}
private:
friend scheduler;
friend write_sender tag_invoke(
tag_t<async_write_some_at>,
async_write_only_file& file,
offset_t offset,
span<const std::byte> buffer) noexcept {
return write_sender{file.context_, file.fd_.get(), offset, buffer};
}
io_uring_context& context_;
safe_file_descriptor fd_;
};
class io_uring_context::async_read_write_file {
public:
using offset_t = std::int64_t;
explicit async_read_write_file(io_uring_context& context, int fd) noexcept
: context_(context), fd_(fd) {}
private:
friend scheduler;
friend write_sender tag_invoke(
tag_t<async_write_some_at>,
async_read_write_file& file,
offset_t offset,
span<const std::byte> buffer) noexcept {
return write_sender{file.context_, file.fd_.get(), offset, buffer};
}
friend read_sender tag_invoke(
tag_t<async_read_some_at>,
async_read_write_file& file,
offset_t offset,
span<std::byte> buffer) noexcept {
return read_sender{file.context_, file.fd_.get(), offset, buffer};
}
io_uring_context& context_;
safe_file_descriptor fd_;
};
class io_uring_context::schedule_at_sender {
template <typename Receiver>
struct operation : schedule_at_operation {
static constexpr bool is_stop_ever_possible =
!is_stop_never_possible_v<stop_token_type_t<Receiver>>;
public:
explicit operation(
io_uring_context& context,
const time_point& dueTime,
Receiver&& r)
: schedule_at_operation(
context,
dueTime,
get_stop_token(r).stop_possible()),
receiver_((Receiver &&) r) {}
void start() noexcept {
if (this->context_.is_running_on_io_thread()) {
start_local();
} else {
start_remote();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_local();
}
static void complete_with_done(operation_base* op) noexcept {
// Avoid instantiating set_done() if we're not going to call it.
if constexpr (is_stop_ever_possible) {
auto& timerOp = *static_cast<operation*>(op);
unifex::set_done(std::move(timerOp).receiver_);
} else {
// This should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
}
// Executed when the timer gets to the front of the ready-to-run queue.
static void maybe_complete_with_value(operation_base* op) noexcept {
auto& timerOp = *static_cast<operation*>(op);
if constexpr (is_stop_ever_possible) {
timerOp.stopCallback_.destruct();
if (get_stop_token(timerOp.receiver_).stop_requested()) {
complete_with_done(op);
return;
}
}
if constexpr (noexcept(unifex::set_value(std::move(timerOp).receiver_))) {
unifex::set_value(std::move(timerOp).receiver_);
} else {
UNIFEX_TRY {
unifex::set_value(std::move(timerOp).receiver_);
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(timerOp).receiver_, std::current_exception());
}
}
}
static void remove_timer_from_queue_and_complete_with_done(
operation_base* op) noexcept {
// Avoid instantiating set_done() if we're never going to call it.
if constexpr (is_stop_ever_possible) {
auto& timerOp = *static_cast<operation*>(op);
UNIFEX_ASSERT(timerOp.context_.is_running_on_io_thread());
timerOp.stopCallback_.destruct();
auto state = timerOp.state_.load(std::memory_order_relaxed);
if ((state & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer not yet removed from the timers_ list. Do that now.
timerOp.context_.remove_timer(&timerOp);
}
unifex::set_done(std::move(timerOp).receiver_);
} else {
// Should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
}
void start_local() noexcept {
if constexpr (is_stop_ever_possible) {
if (get_stop_token(receiver_).stop_requested()) {
// Stop already requested. Don't bother adding the timer.
this->execute_ = &operation::complete_with_done;
this->context_.schedule_local(this);
return;
}
}
this->execute_ = &operation::maybe_complete_with_value;
this->context_.schedule_at_impl(this);
if constexpr (is_stop_ever_possible) {
stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
}
}
void start_remote() noexcept {
this->execute_ = &operation::on_schedule_complete;
this->context_.schedule_remote(this);
}
void request_stop() noexcept {
if (context_.is_running_on_io_thread()) {
request_stop_local();
} else {
request_stop_remote();
}
}
void request_stop_local() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
stopCallback_.destruct();
this->execute_ = &operation::complete_with_done;
auto state = this->state_.load(std::memory_order_relaxed);
if ((state & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer not yet elapsed.
// Remove timer from list of timers and enqueue cancellation.
context_.remove_timer(this);
context_.schedule_local(this);
} else {
// Timer already elapsed and added to ready-to-run queue.
}
}
void request_stop_remote() noexcept {
auto oldState = this->state_.fetch_add(
schedule_at_operation::cancel_pending_flag,
std::memory_order_acq_rel);
if ((oldState & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer had not yet elapsed.
// We are responsible for scheduling the completion of this timer
// operation.
this->execute_ =
&operation::remove_timer_from_queue_and_complete_with_done;
this->context_.schedule_remote(this);
}
}
struct cancel_callback {
operation& op_;
void operator()() noexcept {
op_.request_stop();
}
};
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
};
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
// Note: Only case it might complete with exception_ptr is if the
// receiver's set_value() exits with an exception.
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
explicit schedule_at_sender(
io_uring_context& context,
const time_point& dueTime) noexcept
: context_(context), dueTime_(dueTime) {}
template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) {
return operation<remove_cvref_t<Receiver>>{
context_, dueTime_, (Receiver &&) r};
}
private:
io_uring_context& context_;
time_point dueTime_;
};
class io_uring_context::scheduler {
public:
scheduler(const scheduler&) noexcept = default;
scheduler& operator=(const scheduler&) = default;
~scheduler() = default;
schedule_sender schedule() const noexcept {
return schedule_sender{*context_};
}
time_point now() const noexcept {
return monotonic_clock::now();
}
schedule_at_sender schedule_at(const time_point& dueTime) const noexcept {
return schedule_at_sender{*context_, dueTime};
}
private:
friend io_uring_context;
friend async_read_only_file tag_invoke(
tag_t<open_file_read_only>,
scheduler s,
const filesystem::path& path);
friend async_read_write_file tag_invoke(
tag_t<open_file_read_write>,
scheduler s,
const filesystem::path& path);
friend async_write_only_file tag_invoke(
tag_t<open_file_write_only>,
scheduler s,
const filesystem::path& path);
friend bool operator==(scheduler a, scheduler b) noexcept {
return a.context_ == b.context_;
}
friend bool operator!=(scheduler a, scheduler b) noexcept {
return a.context_ != b.context_;
}
explicit scheduler(io_uring_context& context) noexcept : context_(&context) {}
io_uring_context* context_;
};
inline io_uring_context::scheduler io_uring_context::get_scheduler() noexcept {
return scheduler{*this};
}
} // namespace linuxos
} // namespace unifex
#include <unifex/detail/epilogue.hpp>
#endif // __has_include(<liburing.h>)