source/linux/io_uring_context.cpp (465 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unifex/config.hpp>
#if !UNIFEX_NO_LIBURING
#include <unifex/linux/io_uring_context.hpp>
#include <unifex/scope_guard.hpp>
#include <unifex/exception.hpp>
#include "io_uring_syscall.hpp"
#include <cstring>
#include <system_error>
#include <fcntl.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include <cstdio>
//#define LOGGING_ENABLED
#ifdef LOGGING_ENABLED
#define LOG(S) \
do { \
::std::puts(S); \
::std::fflush(stdout); \
} while (false)
#define LOGX(...) \
do { \
::std::printf(__VA_ARGS__); \
::std::fflush(stdout); \
} while (false)
#else
#define LOG(S) \
do { \
} while (false)
#define LOGX(...) \
do { \
} while (false)
#endif
/////////////////////////////////////////////////////
// io_uring structures
//
// An io_uring is a set of two circular buffers that are used for sumbmitting
// asynchronous operations to the kernel and receiving completion notifications.
//
// To start an asynchronous operation the I/O thread writes an entry to the
// "submission queue" ring buffer containing the parameters for the syscall.
//
// When the operation completes, the kernel will write a completion notification
// to the "completion queue". These do not necessarily complete in the same
// order that they were submitted.
//
// Note that both ring buffers have a power-of-two size and use separate
// atomic head/tail indices to indicate the front/back of the queue
// respectively. These values must be masked
//
// 'head' contains the index of the front of the queue.
// Consumers should read this entry and advance 'head' once they have finished
// processing it.
//
// 'tail' contains the index of the next slot to be written to.
// It is one-past-the-end of the set of valid entries in the ring-buffer.
// Producers should write to this entry and advance 'tail' once written to
// publish the entry.
//
// Constraints:
// - head <= tail
// - tail - head <= entry-count
//
// submission queue
// ----------------
// // 64-byte structure (fits in a single cache-line)
// struct io_uring_sqe {
// __u8 opcode; /* type of operation for this sqe */
// __u8 flags; /* IOSQE_ flags */
// __u16 ioprio; /* ioprio for the request */
// __s32 fd; /* file descriptor to do IO on */
// union {
// __u64 off; /* offset into file */
// __u64 addr2;
// };
// union {
// __u64 addr; /* pointer to buffer or iovecs */
// __u64 splice_off_in;
// };
// __u32 len; /* buffer size or number of iovecs */
// union {
// __kernel_rwf_t rw_flags;
// __u32 fsync_flags;
// __u16 poll_events; /* compatibility */
// __u32 poll32_events; /* word-reversed for BE */
// __u32 sync_range_flags;
// __u32 msg_flags;
// __u32 timeout_flags;
// __u32 accept_flags;
// __u32 cancel_flags;
// __u32 open_flags;
// __u32 statx_flags;
// __u32 fadvise_advice;
// __u32 splice_flags;
// __u32 rename_flags;
// __u32 unlink_flags;
// __u32 hardlink_flags;
// };
// __u64 user_data; /* data to be passed back at completion time */
// /* pack this to avoid bogus arm OABI complaints */
// union {
// /* index into fixed buffers, if used */
// __u16 buf_index;
// /* for grouped buffer selection */
// __u16 buf_group;
// } __attribute__((packed));
// /* personality to use, if used */
// __u16 personality;
// union {
// __s32 splice_fd_in;
// __u32 file_index;
// };
// __u64 __pad2[2];
// };
//
// io_uring_params
// - sq_entries - number of entires in the submission queue
// - sq_off
// __u32 head; - offset of ring head index (first valid )
// __u32 tail;
// __u32 ring_mask;
// __u32 ring_entries;
// __u32 flags;
// __u32 dropped;
// __u32 array;
// __u32 resv1;
// __u32 resv2;
//
// The submission queue has two memory-mapped regions. One for the control
// data for the queue and another for the actual submission queue entries.
//
// Memory mapped region for io_uring fd at offset IORING_OFF_SQ_RING
//
// SQ index array (unsigned)
// +-----------------------------------------+--------------------+
// | |head| |tail| |dropped| |flags| | | | | .... | |
// +-----------------------------------------+--------------------+
// ^ ^ ^ ^ ^
// | | | | |
// sq_off.head | sq_off.dropped | |
// | | sq_off.array
// sq_off.tail sq_off.flags
//
// Memory mapped region for io_uring fd at offset IORING_OFF_SQES
//
// SQE array (io_uring_sqe)
// +-------------------------+
// | | | | | ... | |
// +-------------------------+
//
// Elements of the SQ index array are indices into the SQE array.
//
// When submitting an I/O request you need to:
// - write to a free entry in SQE array
// - write the index of that entry into the SQ index array at offset 'tail'
// - advance 'tail' to publish the entry
//
// Then, depending on which mode the io_uring is operating in, you may need
// to flush these entries by calling io_uring_enter(), passing the number of
// entries to flush.
//
// If the io_uring was created with the flag IOURING_SETUP_SQPOLL then the
// kernel will create a thread the poll for new SQEs. The kernel thread will
// spin waiting for new SQEs to be published for a while and will then go to
// sleep.
// If the kernel thread goes to sleep it will set the IORING_SQ_NEED_WAKEUP
// flag in the sqring's 'flags' field. Applications must check for this flag
// and call io_uring_enter() with IORING_ENTER_SQ_WAKEUP flag set in the flags
// parameter to wake up the kernel thread to start processing items again.
//
// completion queue
// ----------------
// struct io_uring_cqe {
// __u64 user_data; // arbitrary user-data from submission
// __s32 res; // result, return-value of syscall
// __u32 flags; // metadata - currently unused?
// };
//
// io_uring_params
// - cq_entries - number of entries in the completion queue
// - cq_off
// __u32 head; - offset of ring head (first valid entry)
// __u32 tail; - offset of ring tail (last valid entry)
// __u32 ring_mask; - mask to apply to head/tail to get array index
// __u32 ring_entries; - number of entries in ring (same as cq_entries?)
// __u32 overflow;
// __u32 cqes; -
// __u64 resv[2];
//
// Memory mapped region for io_uring fd at offset IORING_OFF_CQ_RING
//
// CQE array (io_uring_cqe)
// +-----------------------------------+--------------------+
// | |head| |tail| |overflow| | | | | .... | |
// +-----------------------------------+--------------------+
// ^ ^ ^ ^
// | | | |
// cq_off.head | cq_off.overflow |
// | cq_off.cqes
// cq_off.tail
//
// When an operation completes the kernel will write an entry to the next
// free slot (at index 'tail') in the CQE array and then publish this entry
// by incrementing 'tail'.
//
// An application should periodically poll 'head' and 'tail' to detect new
// entries being added and advance 'head' once they have finished processing
// the new entries.
//
// If the application otherwise does not have anything to do on the I/O thread
// then it can block, waiting for new entries to be added by calling
// io_uring_enter(), passing the IORING_ENTER_GETEVENTS flag and passing a
// non-zero value for 'min_complete'.
//
//
// Structure of the io_uring_context
// ---------------------------------
// Assumes a single thread that submits I/O and processes completion events.
// This is the thread that calls run().
//
// When a thread schedules work using 'schedule()' it takes one of two paths
// depending on whether it is being submitted from the I/O thread or a remote
// thread.
//
// If it is submitted from the I/O thread then it is just immediately appended
// to the io_uring_context's queue of ready-to-run operations. It will be
// processed next time around the run() loop.
//
// If it is submitted from a remote thread then we perform a lock-free push
// of the item onto a queue of remotely scheduled items.
//
// If the I/O thread becomes idle then it marks the queue with an
// 'inactive consumer' flag and submits an IORING_OP_POLL_ADD operation on an
// eventfd object.
//
// The next time a remote thread enqueues an item to the queue it will see and
// clear this 'inactive consumer' flag and then signal the eventfd by writing
// to it to wake-up the I/O thread.
//
// This will cause an I/O completion event for the POLL operation to be posted
// to the completion queue and wake-up the I/O thread which will then acquire
// the list of remotely scheduled items and add them to the list of
// ready-to-run operations.
namespace unifex::linuxos {
static thread_local io_uring_context* currentThreadContext;
static constexpr __u64 remote_queue_event_user_data = 0;
io_uring_context::io_uring_context() {
io_uring_params params;
std::memset(¶ms, 0, sizeof(params));
int ret = io_uring_setup(256, ¶ms);
if (ret < 0) {
throw_(std::system_error{-ret, std::system_category()});
}
iouringFd_ = safe_file_descriptor{ret};
{
auto cqSize = params.cq_off.cqes + params.cq_entries * sizeof(io_uring_cqe);
void* cqPtr = mmap(
0,
cqSize,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
iouringFd_.get(),
IORING_OFF_CQ_RING);
if (cqPtr == MAP_FAILED) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
cqMmap_ = mmap_region{cqPtr, cqSize};
char* cqBlock = static_cast<char*>(cqPtr);
cqEntryCount_ = params.cq_entries;
UNIFEX_ASSERT(
cqEntryCount_ ==
*reinterpret_cast<unsigned*>(
cqBlock +
params.cq_off.ring_entries)); // Is this a valid assumption?
cqMask_ = *reinterpret_cast<unsigned*>(cqBlock + params.cq_off.ring_mask);
UNIFEX_ASSERT(cqMask_ == (cqEntryCount_ - 1));
cqHead_ =
reinterpret_cast<std::atomic<unsigned>*>(cqBlock + params.cq_off.head);
cqTail_ =
reinterpret_cast<std::atomic<unsigned>*>(cqBlock + params.cq_off.tail);
cqOverflow_ = reinterpret_cast<std::atomic<unsigned>*>(
cqBlock + params.cq_off.overflow);
cqEntries_ = reinterpret_cast<io_uring_cqe*>(cqBlock + params.cq_off.cqes);
}
{
auto sqSize = params.sq_off.array + params.sq_entries * sizeof(__u32);
void* sqPtr = mmap(
0,
sqSize,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
iouringFd_.get(),
IORING_OFF_SQ_RING);
if (sqPtr == MAP_FAILED) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
sqMmap_ = mmap_region{sqPtr, sqSize};
char* sqBlock = static_cast<char*>(sqPtr);
sqEntryCount_ = params.sq_entries;
UNIFEX_ASSERT(
sqEntryCount_ ==
*reinterpret_cast<unsigned*>(
sqBlock +
params.sq_off.ring_entries)); // Is this a valid assumption?
sqMask_ = *reinterpret_cast<unsigned*>(sqBlock + params.sq_off.ring_mask);
UNIFEX_ASSERT(sqMask_ == (sqEntryCount_ - 1));
sqHead_ =
reinterpret_cast<std::atomic<unsigned>*>(sqBlock + params.sq_off.head);
sqTail_ =
reinterpret_cast<std::atomic<unsigned>*>(sqBlock + params.sq_off.tail);
sqFlags_ =
reinterpret_cast<std::atomic<unsigned>*>(sqBlock + params.sq_off.flags);
sqDropped_ = reinterpret_cast<std::atomic<unsigned>*>(
sqBlock + params.sq_off.dropped);
sqIndexArray_ = reinterpret_cast<unsigned*>(sqBlock + params.sq_off.array);
}
{
auto sqeSize = params.sq_entries * sizeof(io_uring_sqe);
void* sqePtr = mmap(
0,
sqeSize,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
iouringFd_.get(),
IORING_OFF_SQES);
if (sqePtr == MAP_FAILED) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
sqeMmap_ = mmap_region{sqePtr, sqeSize};
sqEntries_ = reinterpret_cast<io_uring_sqe*>(sqePtr);
}
{
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (fd < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
remoteQueueEventFd_ = safe_file_descriptor{fd};
}
LOG("io_uring_context construction done");
}
io_uring_context::~io_uring_context() {}
void io_uring_context::run_impl(const bool& shouldStop) {
LOG("run loop started");
auto* oldContext = std::exchange(currentThreadContext, this);
scope_guard g = [=]() noexcept {
std::exchange(currentThreadContext, oldContext);
LOG("run loop exited");
};
while (true) {
// Dequeue and process local queue items (ready to run)
execute_pending_local();
if (shouldStop) {
break;
}
// Check for any new completion-queue items.
acquire_completion_queue_items();
if (timersAreDirty_) {
update_timers();
}
// Check for remotely-queued items.
// Only do this if we haven't submitted a poll operation for the
// completion queue - in which case we'll just wait until we receive the
// completion-queue item).
if (!remoteQueueReadSubmitted_) {
acquire_remote_queued_items();
}
// Process additional I/O requests that were waiting for
// additional space either in the submission queue or the completion queue.
while (!pendingIoQueue_.empty() && can_submit_io()) {
auto* item = pendingIoQueue_.pop_front();
item->execute_(item);
}
if (localQueue_.empty() || sqUnflushedCount_ > 0) {
const bool isIdle = sqUnflushedCount_ == 0 && localQueue_.empty();
if (isIdle) {
if (!remoteQueueReadSubmitted_) {
LOG("try_register_remote_queue_notification()");
remoteQueueReadSubmitted_ = try_register_remote_queue_notification();
}
}
int minCompletionCount = 0;
unsigned flags = 0;
if (isIdle &&
(remoteQueueReadSubmitted_ ||
pending_operation_count() == cqEntryCount_)) {
// No work to do until we receive a completion event.
minCompletionCount = 1;
flags = IORING_ENTER_GETEVENTS;
}
LOGX(
"io_uring_enter() - submit %u, wait for %i, pending %u\n",
sqUnflushedCount_,
minCompletionCount,
pending_operation_count());
int result = io_uring_enter(
iouringFd_.get(),
sqUnflushedCount_,
minCompletionCount,
flags,
nullptr);
if (result < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
LOG("io_uring_enter() returned");
sqUnflushedCount_ -= result;
cqPendingCount_ += result;
}
}
}
bool io_uring_context::is_running_on_io_thread() const noexcept {
return this == currentThreadContext;
}
void io_uring_context::schedule_impl(operation_base* op) {
UNIFEX_ASSERT(op != nullptr);
if (is_running_on_io_thread()) {
schedule_local(op);
} else {
schedule_remote(op);
}
}
void io_uring_context::schedule_local(operation_base* op) noexcept {
localQueue_.push_back(op);
}
void io_uring_context::schedule_local(operation_queue ops) noexcept {
localQueue_.append(std::move(ops));
}
void io_uring_context::schedule_remote(operation_base* op) noexcept {
bool ioThreadWasInactive = remoteQueue_.enqueue(op);
if (ioThreadWasInactive) {
// We were the first to queue an item and the I/O thread is not
// going to check the queue until we signal it that new items
// have been enqueued remotely by writing to the eventfd.
signal_remote_queue();
}
}
void io_uring_context::schedule_pending_io(operation_base* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
pendingIoQueue_.push_back(op);
}
void io_uring_context::reschedule_pending_io(operation_base* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
pendingIoQueue_.push_front(op);
}
void io_uring_context::schedule_at_impl(schedule_at_operation* op) noexcept {
UNIFEX_ASSERT(is_running_on_io_thread());
timers_.insert(op);
if (timers_.top() == op) {
timersAreDirty_ = true;
}
}
void io_uring_context::execute_pending_local() noexcept {
if (localQueue_.empty()) {
LOG("local queue is empty");
return;
}
LOG("processing local queue items");
size_t count = 0;
auto pending = std::move(localQueue_);
while (!pending.empty()) {
auto* item = pending.pop_front();
item->execute_(item);
++count;
}
LOGX("processed %zu local queue items\n", count);
}
void io_uring_context::acquire_completion_queue_items() noexcept {
// Use 'relaxed' load for the head since it should only ever
// be modified by the current thread.
std::uint32_t cqHead = cqHead_->load(std::memory_order_relaxed);
std::uint32_t cqTail = cqTail_->load(std::memory_order_acquire);
LOGX("completion queue head = %u, tail = %u\n", cqHead, cqTail);
if (cqHead != cqTail) {
const auto mask = cqMask_;
const auto count = cqTail - cqHead;
UNIFEX_ASSERT(count <= cqEntryCount_);
operation_base head;
LOGX("got %u completions\n", count);
operation_queue completionQueue;
for (std::uint32_t i = 0; i < count; ++i) {
auto& cqe = cqEntries_[(cqHead + i) & mask];
if (cqe.user_data == remote_queue_event_user_data) {
LOG("got remote queue wakeup");
if (cqe.res < 0) {
LOGX("remote queue wakeup failed err: %i\n", cqe.res);
// readv() operation failed.
// TODO: What to do here?
std::terminate();
}
// Read the eventfd to clear the signal.
__u64 buffer;
ssize_t bytesRead =
read(remoteQueueEventFd_.get(), &buffer, sizeof(buffer));
if (bytesRead < 0) {
// read() failed
[[maybe_unused]] int errorCode = errno;
LOGX("read on eventfd failed with %i\n", errorCode);
std::terminate();
}
UNIFEX_ASSERT(bytesRead == sizeof(buffer));
// Skip processing this item and let the loop check
// for the remote-queued items next time around.
remoteQueueReadSubmitted_ = false;
continue;
} else if (cqe.user_data == timer_user_data()) {
LOGX("got timer completion result %i\n", cqe.res);
UNIFEX_ASSERT(activeTimerCount_ > 0);
--activeTimerCount_;
LOGX("now %u active timers\n", activeTimerCount_);
if (cqe.res != ECANCELED) {
LOG("timer not cancelled, marking timers as dirty");
timersAreDirty_ = true;
}
if (activeTimerCount_ == 0) {
LOG("no more timers, resetting current due time");
currentDueTime_.reset();
}
continue;
} else if (cqe.user_data == remove_timer_user_data()) {
// Ignore timer cancellation completion.
continue;
}
auto& completionState = *reinterpret_cast<completion_base*>(
static_cast<std::uintptr_t>(cqe.user_data));
// Save the result in the completion state.
completionState.result_ = cqe.res;
// Add it to a temporary queue of newly completed items.
completionQueue.push_back(&completionState);
}
schedule_local(std::move(completionQueue));
// Mark those completion queue entries as consumed.
cqHead_->store(cqTail, std::memory_order_release);
cqPendingCount_ -= count;
}
}
void io_uring_context::acquire_remote_queued_items() noexcept {
UNIFEX_ASSERT(!remoteQueueReadSubmitted_);
auto items = remoteQueue_.dequeue_all();
LOG(items.empty() ? "remote queue is empty"
: "acquired items from remote queue");
schedule_local(std::move(items));
}
bool io_uring_context::try_register_remote_queue_notification() noexcept {
// Check that we haven't already hit the limit of pending
// I/O completion events.
const auto populateRemoteQueuePollSqe = [this](io_uring_sqe & sqe) noexcept {
auto queuedItems = remoteQueue_.try_mark_inactive_or_dequeue_all();
if (!queuedItems.empty()) {
schedule_local(std::move(queuedItems));
return false;
}
sqe.opcode = IORING_OP_POLL_ADD;
sqe.fd = remoteQueueEventFd_.get();
sqe.poll_events = POLL_IN;
sqe.user_data = remote_queue_event_user_data;
return true;
};
if (try_submit_io(populateRemoteQueuePollSqe)) {
LOG("added eventfd poll to submission queue");
return true;
}
return false;
}
void io_uring_context::signal_remote_queue() {
LOG("writing bytes to eventfd");
// Notify eventfd() by writing a 64-bit integer to it.
const __u64 value = 1;
ssize_t bytesWritten =
write(remoteQueueEventFd_.get(), &value, sizeof(value));
if (bytesWritten < 0) {
// What to do here? Terminate/abort/ignore?
// Try to dequeue the item before returning?
LOG("error writing to remote queue eventfd");
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
UNIFEX_ASSERT(bytesWritten == sizeof(value));
}
void io_uring_context::remove_timer(schedule_at_operation* op) noexcept {
LOGX("remove_timer(%p)\n", (void*)op);
UNIFEX_ASSERT(!timers_.empty());
if (timers_.top() == op) {
timersAreDirty_ = true;
}
timers_.remove(op);
}
void io_uring_context::update_timers() noexcept {
LOG("update_timers()");
// Reap any elapsed timers.
if (!timers_.empty()) {
time_point now = monotonic_clock::now();
while (!timers_.empty() && timers_.top()->dueTime_ <= now) {
schedule_at_operation* item = timers_.pop();
LOGX("dequeued elapsed timer %p\n", (void*)item);
if (item->canBeCancelled_) {
auto oldState = item->state_.fetch_add(
schedule_at_operation::timer_elapsed_flag,
std::memory_order_acq_rel);
if ((oldState & schedule_at_operation::cancel_pending_flag) != 0) {
LOGX("timer already cancelled\n");
// Timer has been cancelled by a remote thread.
// The other thread is responsible for enqueueing is operation onto
// the remoteQueue_.
continue;
}
}
// Otherwise, we are responsible for enqueuing the timer onto the
// ready-to-run queue.
schedule_local(item);
}
}
// Check if we need to cancel or start some new OS timers.
if (timers_.empty()) {
if (currentDueTime_.has_value()) {
LOG("no more schedule_at requests, cancelling timer");
// Cancel the outstanding timer.
if (try_submit_timer_io_cancel()) {
currentDueTime_.reset();
timersAreDirty_ = false;
}
}
} else {
const auto earliestDueTime = timers_.top()->dueTime_;
if (currentDueTime_) {
constexpr auto threshold = std::chrono::microseconds(1);
if (earliestDueTime < (*currentDueTime_ - threshold)) {
LOG("active timer, need to cancel and submit an earlier one");
// An earlier time has been scheduled.
// Cancel the old timer before submitting a new one.
if (try_submit_timer_io_cancel()) {
currentDueTime_.reset();
if (try_submit_timer_io(earliestDueTime)) {
currentDueTime_ = earliestDueTime;
timersAreDirty_ = false;
}
}
} else {
timersAreDirty_ = false;
}
} else {
// No active timer, submit a new timer
LOG("no active timer, trying to submit a new one");
if (try_submit_timer_io(earliestDueTime)) {
currentDueTime_ = earliestDueTime;
timersAreDirty_ = false;
}
}
}
}
bool io_uring_context::try_submit_timer_io(const time_point& dueTime) noexcept {
auto populateSqe = [&](io_uring_sqe & sqe) noexcept {
sqe.opcode = IORING_OP_TIMEOUT;
sqe.addr = reinterpret_cast<std::uintptr_t>(&time_);
sqe.len = 1;
sqe.rw_flags =
1; // HACK: Should be 'sqe.timeout_flags = IORING_TIMEOUT_ABS'
sqe.user_data = timer_user_data();
time_.tv_sec = dueTime.seconds_part();
time_.tv_nsec = dueTime.nanoseconds_part();
};
if (try_submit_io(populateSqe)) {
++activeTimerCount_;
return true;
}
return false;
}
bool io_uring_context::try_submit_timer_io_cancel() noexcept {
auto populateSqe = [&](io_uring_sqe & sqe) noexcept {
sqe.opcode = 12; // IORING_OP_TIMEOUT_REMOVE;
sqe.addr = timer_user_data();
sqe.user_data = remove_timer_user_data();
};
return try_submit_io(populateSqe);
}
io_uring_context::async_read_only_file tag_invoke(
tag_t<open_file_read_only>,
io_uring_context::scheduler scheduler,
const filesystem::path& path) {
int result = ::open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (result < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
return io_uring_context::async_read_only_file{*scheduler.context_, result};
}
io_uring_context::async_write_only_file tag_invoke(
tag_t<open_file_write_only>,
io_uring_context::scheduler scheduler,
const filesystem::path& path) {
int result = ::open(path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0644);
if (result < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
return io_uring_context::async_write_only_file{*scheduler.context_, result};
}
io_uring_context::async_read_write_file tag_invoke(
tag_t<open_file_read_write>,
io_uring_context::scheduler scheduler,
const filesystem::path& path) {
int result = ::open(path.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0644);
if (result < 0) {
int errorCode = errno;
throw_(std::system_error{errorCode, std::system_category()});
}
return io_uring_context::async_read_write_file{*scheduler.context_, result};
}
} // namespace unifex::linuxos
#endif // UNIFEX_NO_LIBURING