include/unifex/thread_unsafe_event_loop.hpp (318 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unifex/config.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/manual_lifetime.hpp> #include <unifex/receiver_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/stop_token_concepts.hpp> #include <chrono> #include <exception> #include <optional> #include <type_traits> #include <utility> #include <unifex/detail/prologue.hpp> namespace unifex { class thread_unsafe_event_loop; namespace _thread_unsafe_event_loop { using clock_t = std::chrono::steady_clock; using time_point_t = clock_t::time_point; class cancel_callback; class operation_base { friend cancel_callback; protected: using execute_fn = void(operation_base*) noexcept; operation_base(thread_unsafe_event_loop& loop, execute_fn* execute) noexcept : loop_(loop), execute_(execute) {} operation_base(const operation_base&) = delete; operation_base(operation_base&&) = delete; public: void start() noexcept; private: friend thread_unsafe_event_loop; void execute() noexcept { this->execute_(this); } thread_unsafe_event_loop& loop_; operation_base* next_; operation_base** prevPtr_; execute_fn* execute_; protected: time_point_t dueTime_; }; class cancel_callback { public: explicit cancel_callback(operation_base& op) noexcept : op_(&op) {} void operator()() noexcept; private: operation_base* const op_; }; class scheduler; template <typename Duration> struct _schedule_after_sender { class type; }; template <typename Duration> using schedule_after_sender = typename _schedule_after_sender<Duration>::type; template <typename Duration, typename Receiver> struct _after_op { class type; }; template <typename Duration, typename Receiver> using after_operation = typename _after_op<Duration, remove_cvref_t<Receiver>>::type; template <typename Duration, typename Receiver> class _after_op<Duration, Receiver>::type final : public operation_base { friend schedule_after_sender<Duration>; public: void start() noexcept { this->dueTime_ = clock_t::now() + duration_; callback_.construct( get_stop_token(receiver_), cancel_callback{*this}); operation_base::start(); } private: template <typename Receiver2> explicit type( Receiver2&& r, Duration d, thread_unsafe_event_loop& loop) : operation_base(loop, &type::execute_impl) , receiver_((Receiver2 &&) r) , duration_(d) {} static void execute_impl(operation_base* p) noexcept { auto& self = *static_cast<type*>(p); self.callback_.destruct(); if constexpr (is_stop_never_possible_v< stop_token_type_t<Receiver&>>) { unifex::set_value(std::move(self.receiver_)); } else { if (get_stop_token(self.receiver_).stop_requested()) { unifex::set_done(std::move(self.receiver_)); } else { unifex::set_value(std::move(self.receiver_)); } } } UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; UNIFEX_NO_UNIQUE_ADDRESS Duration duration_; UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t< Receiver&>::template callback_type<cancel_callback>> callback_; }; template <typename Duration> class _schedule_after_sender<Duration>::type { using schedule_after_sender = type; public: template < template <typename...> class Variant, template <typename...> class Tuple> using value_types = Variant<Tuple<>>; template <template <typename...> class Variant> using error_types = Variant<>; static constexpr bool sends_done = true; template <typename Receiver> after_operation<Duration, remove_cvref_t<Receiver>> connect(Receiver&& r) const& { return after_operation<Duration, remove_cvref_t<Receiver>>{ (Receiver &&) r, duration_, loop_}; } private: friend scheduler; explicit type( thread_unsafe_event_loop& loop, Duration duration) noexcept : loop_(loop), duration_(duration) {} thread_unsafe_event_loop& loop_; Duration duration_; }; struct schedule_at_sender; template <typename Receiver> struct _at_op { class type; }; template <typename Receiver> using at_operation = typename _at_op<remove_cvref_t<Receiver>>::type; template <typename Receiver> class _at_op<Receiver>::type final : public operation_base { public: void start() noexcept { callback_.construct( get_stop_token(receiver_), cancel_callback{*this}); operation_base::start(); } private: friend schedule_at_sender; template <typename Receiver2> explicit type( Receiver2&& r, time_point_t tp, thread_unsafe_event_loop& loop) : operation_base(loop, &type::execute_impl), receiver_((Receiver2 &&) r) { this->dueTime_ = tp; } static void execute_impl(operation_base* p) noexcept { auto& self = *static_cast<type*>(p); self.callback_.destruct(); if constexpr (is_stop_never_possible_v< stop_token_type_t<Receiver&>>) { unifex::set_value(std::move(self.receiver_)); } else { if (get_stop_token(self.receiver_).stop_requested()) { unifex::set_done(std::move(self.receiver_)); } else { unifex::set_value(std::move(self.receiver_)); } } } UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t< Receiver&>::template callback_type<cancel_callback>> callback_; }; struct schedule_at_sender { template < template <typename...> class Variant, template <typename...> class Tuple> using value_types = Variant<Tuple<>>; template <template <typename...> class Variant> using error_types = Variant<>; static constexpr bool sends_done = true; template <typename Receiver> at_operation<remove_cvref_t<Receiver>> connect(Receiver&& r) const& { return at_operation<remove_cvref_t<Receiver>>{ (Receiver &&) r, dueTime_, loop_}; } private: friend scheduler; explicit schedule_at_sender( thread_unsafe_event_loop& loop, time_point_t dueTime) : loop_(loop), dueTime_(dueTime) {} thread_unsafe_event_loop& loop_; time_point_t dueTime_; }; class scheduler { public: auto schedule_at(time_point_t dueTime) const noexcept { return schedule_at_sender{*loop_, dueTime}; } template <typename Rep, typename Ratio> auto schedule_after(std::chrono::duration<Rep, Ratio> d) const noexcept { return schedule_after_sender<std::chrono::duration<Rep, Ratio>>{*loop_, d}; } auto schedule() const noexcept { return schedule_after(std::chrono::milliseconds(0)); } friend bool operator==(scheduler a, scheduler b) noexcept { return a.loop_ == b.loop_; } friend bool operator!=(scheduler a, scheduler b) noexcept { return a.loop_ != b.loop_; } private: friend thread_unsafe_event_loop; explicit scheduler(thread_unsafe_event_loop& loop) noexcept : loop_(&loop) {} thread_unsafe_event_loop* loop_; }; template <typename T> struct _sync_wait_promise { class type; }; template <typename T> using sync_wait_promise = typename _sync_wait_promise<T>::type; template <typename T> class _sync_wait_promise<T>::type { using sync_wait_promise = type; enum class state { incomplete, done, value, error }; class receiver { public: template <typename... Values> void set_value(Values&&... values) && noexcept { UNIFEX_TRY { unifex::activate_union_member(promise_.value_, (Values &&) values...); promise_.state_ = state::value; } UNIFEX_CATCH (...) { unifex::activate_union_member(promise_.exception_, std::current_exception()); promise_.state_ = state::error; } } void set_error(std::exception_ptr ex) && noexcept { unifex::activate_union_member(promise_.exception_, std::move(ex)); promise_.state_ = state::error; } void set_done() && noexcept { promise_.state_ = state::done; } private: friend sync_wait_promise; explicit receiver(sync_wait_promise& promise) noexcept : promise_(promise) {} sync_wait_promise& promise_; }; public: type() noexcept {} ~type() { if (state_ == state::value) { unifex::deactivate_union_member(value_); } else if (state_ == state::error) { unifex::deactivate_union_member(exception_); } } receiver get_receiver() noexcept { return receiver{*this}; } std::optional<T> get() && { switch (state_) { case state::done: return std::nullopt; case state::value: return std::move(value_).get(); case state::error: std::rethrow_exception(exception_.get()); default: UNIFEX_ASSERT(false); std::terminate(); } } private: union { manual_lifetime<T> value_; manual_lifetime<std::exception_ptr> exception_; }; state state_ = state::incomplete; }; } // namespace _thread_unsafe_event_loop class thread_unsafe_event_loop { using operation_base = _thread_unsafe_event_loop::operation_base; using scheduler = _thread_unsafe_event_loop::scheduler; using cancel_callback = _thread_unsafe_event_loop::cancel_callback; friend operation_base; friend cancel_callback; void enqueue(operation_base* op) noexcept; void run_until_empty() noexcept; operation_base* head_ = nullptr; public: using clock_t = _thread_unsafe_event_loop::clock_t; using time_point_t = _thread_unsafe_event_loop::time_point_t; scheduler get_scheduler() noexcept { return scheduler{*this}; } template < typename Sender, typename Result = sender_single_value_result_t<remove_cvref_t<Sender>>> std::optional<Result> sync_wait(Sender&& sender) { using promise_t = _thread_unsafe_event_loop::sync_wait_promise<Result>; promise_t promise; auto op = connect((Sender &&) sender, promise.get_receiver()); start(op); run_until_empty(); return std::move(promise).get(); } }; namespace _thread_unsafe_event_loop { inline void operation_base::start() noexcept { loop_.enqueue(this); } } } // namespace unifex #include <unifex/detail/epilogue.hpp>