inline async_simple::coro::Lazy async_io()

in include/ylt/coro_io/coro_io.hpp [226:321]


inline async_simple::coro::Lazy<ret_type> async_io(IO_func io_func,
                                                   io_object &obj) noexcept {
  callback_awaitor<ret_type> awaitor;
  auto slot = co_await async_simple::coro::CurrentSlot{};
  if (!slot) {
    co_return co_await awaitor.await_resume([&](auto handler) {
      io_func([&, handler](auto &&...args) mutable {
        handler.set_value(std::forward<decltype(args)>(args)...);
        handler.resume();
      });
    });
  }
  else {
    auto executor = obj.get_executor();
    auto lock = std::make_shared<std::atomic<int>>();
    bool hasCanceled;
    auto result = co_await awaitor.await_resume([&, &lock_ref = lock](
                                                    auto handler) mutable {
      auto lock = lock_ref;
      hasCanceled = !slot->emplace(
          async_simple::SignalType::Terminate,
          [&obj, lock](async_simple::SignalType signalType,
                       async_simple::Signal *signal) {
            int expected = detail::no_cancel_flag;
            if (!lock->compare_exchange_strong(expected,
                                               detail::could_cancel_flag,
                                               std::memory_order_acq_rel)) {
              if (expected == detail::could_cancel_flag) {
                if (lock->compare_exchange_strong(expected,
                                                  detail::start_cancel_flag,
                                                  std::memory_order_release)) {
                  detail::cancel(obj);
                  lock->store(detail::finish_cancel_flag,
                              std::memory_order_release);
                }
              }
            }
          });
      if (hasCanceled) {
        asio::dispatch(executor, [handler]() {
          handler.set_value(
              std::make_error_code(std::errc::operation_canceled));
          handler.resume();
        });
      }
      else {
        io_func([&, handler](auto &&...args) mutable {
          slot->clear(async_simple::Terminate);
          handler.set_value(std::forward<decltype(args)>(args)...);
          handler.resume();
        });
        int expected = detail::no_cancel_flag;
        if (!lock->compare_exchange_strong(expected, detail::could_cancel_flag,
                                           std::memory_order_acq_rel)) {
          if (expected == detail::could_cancel_flag) {
            if (lock->compare_exchange_strong(expected,
                                              detail::start_cancel_flag,
                                              std::memory_order_release)) {
              detail::cancel(obj);
              lock->store(detail::finish_cancel_flag,
                          std::memory_order_release);
            }
          }
        }
      }
    });
    if (!hasCanceled) {
      int expected = detail::no_cancel_flag;
      if (!lock->compare_exchange_strong(expected, detail::finish_cancel_flag,
                                         std::memory_order_acq_rel)) {
        if (expected != detail::finish_cancel_flag) {
          do {
            if (expected == detail::could_cancel_flag) {
              if (lock->compare_exchange_strong(expected,
                                                detail::finish_cancel_flag,
                                                std::memory_order_acq_rel) ||
                  expected == detail::finish_cancel_flag) {
                break;
              }
            }
            // flag is start_cancel_flag now.
            // wait cancel finish to make sure io object's life-time
            for (; lock->load(std::memory_order_acquire) ==
                   detail::start_cancel_flag;) {
              co_await coro_io::post(
                  []() {
                  },
                  executor);
            }
          } while (0);
        }
      }
    }
    co_return result;
  }
}