in include/ylt/coro_io/coro_io.hpp [226:321]
inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
io_object &obj) noexcept {
callback_awaitor<ret_type> awaitor;
auto slot = co_await async_simple::coro::CurrentSlot{};
if (!slot) {
co_return co_await awaitor.await_resume([&](auto handler) {
io_func([&, handler](auto &&...args) mutable {
handler.set_value(std::forward<decltype(args)>(args)...);
handler.resume();
});
});
}
else {
auto executor = obj.get_executor();
auto lock = std::make_shared<std::atomic<int>>();
bool hasCanceled;
auto result = co_await awaitor.await_resume([&, &lock_ref = lock](
auto handler) mutable {
auto lock = lock_ref;
hasCanceled = !slot->emplace(
async_simple::SignalType::Terminate,
[&obj, lock](async_simple::SignalType signalType,
async_simple::Signal *signal) {
int expected = detail::no_cancel_flag;
if (!lock->compare_exchange_strong(expected,
detail::could_cancel_flag,
std::memory_order_acq_rel)) {
if (expected == detail::could_cancel_flag) {
if (lock->compare_exchange_strong(expected,
detail::start_cancel_flag,
std::memory_order_release)) {
detail::cancel(obj);
lock->store(detail::finish_cancel_flag,
std::memory_order_release);
}
}
}
});
if (hasCanceled) {
asio::dispatch(executor, [handler]() {
handler.set_value(
std::make_error_code(std::errc::operation_canceled));
handler.resume();
});
}
else {
io_func([&, handler](auto &&...args) mutable {
slot->clear(async_simple::Terminate);
handler.set_value(std::forward<decltype(args)>(args)...);
handler.resume();
});
int expected = detail::no_cancel_flag;
if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag,
std::memory_order_acq_rel)) {
if (expected == detail::could_cancel_flag) {
if (lock->compare_exchange_strong(expected,
detail::start_cancel_flag,
std::memory_order_release)) {
detail::cancel(obj);
lock->store(detail::finish_cancel_flag,
std::memory_order_release);
}
}
}
}
});
if (!hasCanceled) {
int expected = detail::no_cancel_flag;
if (!lock->compare_exchange_strong(expected, detail::finish_cancel_flag,
std::memory_order_acq_rel)) {
if (expected != detail::finish_cancel_flag) {
do {
if (expected == detail::could_cancel_flag) {
if (lock->compare_exchange_strong(expected,
detail::finish_cancel_flag,
std::memory_order_acq_rel) ||
expected == detail::finish_cancel_flag) {
break;
}
}
// flag is start_cancel_flag now.
// wait cancel finish to make sure io object's life-time
for (; lock->load(std::memory_order_acquire) ==
detail::start_cancel_flag;) {
co_await coro_io::post(
[]() {
},
executor);
}
} while (0);
}
}
}
co_return result;
}
}