in include/unifex/linux/io_epoll_context.hpp [763:809]
void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto result = writev(fd_, buffer_, 1);
if (result == -EAGAIN || result == -EWOULDBLOCK || result == -EPERM) {
if constexpr (is_stop_ever_possible) {
stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
}
UNIFEX_ASSERT(static_cast<completion_base*>(this)->enqueued_.load() == 0);
static_cast<completion_base*>(this)->execute_ = &operation::on_write_complete;
epoll_event event;
event.data.ptr = static_cast<completion_base*>(this);
event.events = EPOLLOUT | EPOLLRDHUP | EPOLLHUP;
(void)epoll_ctl(context_.epollFd_.get(), EPOLL_CTL_ADD, fd_, &event);
return;
}
auto oldState = state_.fetch_add(
io_epoll_context::write_sender::operation<Receiver>::io_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::write_sender::operation<Receiver>::cancel_pending_mask) != 0) {
// io has been cancelled by a remote thread.
// The other thread is responsible for enqueueing the operation completion
return;
}
if (result == -ECANCELED) {
unifex::set_done(std::move(receiver_));
} else if (result >= 0) {
if constexpr (is_nothrow_receiver_of_v<Receiver, ssize_t>) {
unifex::set_value(std::move(receiver_), ssize_t(result));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(receiver_), ssize_t(result));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
} else {
unifex::set_error(
std::move(receiver_),
std::error_code{-int(result), std::system_category()});
}
}