include/unifex/stop_immediately.hpp (361 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unifex/receiver_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/stream_concepts.hpp> #include <unifex/manual_lifetime.hpp> #include <unifex/type_traits.hpp> #include <unifex/type_list.hpp> #include <unifex/inplace_stop_token.hpp> #include <unifex/unstoppable_token.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/async_trace.hpp> #include <unifex/bind_back.hpp> #include <unifex/exception.hpp> #include <atomic> #include <exception> #include <utility> #include <type_traits> #include <unifex/detail/prologue.hpp> namespace unifex { namespace _stop_immediately { template <typename SourceStream, typename... Values> struct _stream { struct type; }; template <typename SourceStream, typename... Values> using stream = typename _stream<remove_cvref_t<SourceStream>, Values...>::type; template <typename SourceStream, typename... Values> struct _stream<SourceStream, Values...>::type { private: using stream = type; enum class state { not_started, source_next_completed, source_next_active, source_next_active_stream_stopped, source_next_active_cleanup_requested, cleanup_completed }; struct cleanup_operation_base { virtual void start_cleanup() noexcept = 0; }; struct next_receiver_base { virtual void set_value(Values&&... values) && noexcept = 0; virtual void set_done() && noexcept = 0; virtual void set_error(std::exception_ptr ex) && noexcept = 0; }; struct cancel_next_callback { stream& stream_; void operator()() noexcept { auto oldState = stream_.state_.load(std::memory_order_acquire); if (oldState == state::source_next_active) { // We may be racing with the next() operation completing on another // thread so we need to use a compare_exchange here to decide the // race. // Note that the callback destructor is run when we receive // the next() operation completion signal before delivering the signal // to the true receiver. The destructor will will block waiting for // this method to return and so we are guaranteed that there will be // no further call to next() or to cleanup() before we return here. // The only concurrent state transition can be from // 'source_next_active' to 'idle' and there will be no further state // changes until we return. // Thus it should be safe to use 'relaxed' memory access for the // compare-exchange below since we have already synchronised with the // 'acquire' operation above. if (stream_.state_.compare_exchange_strong( oldState, state::source_next_active_stream_stopped, std::memory_order_relaxed)) { // Successfully acquired ownership over the receiver. // Send the 'done' signal immediately to signal the end of the // sequence and also send the stop signal to the still-running // next() operation. stream_.stopSource_.request_stop(); auto receiver = std::exchange(stream_.nextReceiver_, nullptr); UNIFEX_ASSERT(receiver != nullptr); std::move(*receiver).set_done(); } else { UNIFEX_ASSERT(oldState == state::source_next_completed); } } else { UNIFEX_ASSERT(oldState == state::source_next_completed); } } }; struct next_receiver { stream& stream_; inplace_stop_source& get_stop_source() const { return stream_.stopSource_; } // Note, parameters passed by value here just in case we are passed // references to values owned by the operation object that we will be // destroying before passing the values along to the next receiver. void set_value(Values... values) && noexcept { handle_signal([&](next_receiver_base* receiver) noexcept { UNIFEX_TRY { std::move(*receiver).set_value((Values&&)values...); } UNIFEX_CATCH (...) { std::move(*receiver).set_error(std::current_exception()); } }); } void set_done() && noexcept { handle_signal([](next_receiver_base* receiver) noexcept { std::move(*receiver).set_done(); }); } template <typename Error> void set_error(Error&& error) && noexcept { std::move(*this).set_error(make_exception_ptr((Error&&)error)); } void set_error(std::exception_ptr ex) && noexcept { auto& nextError = stream_.nextError_; nextError = std::move(ex); handle_signal([&](next_receiver_base* receiver) noexcept { std::move(*receiver).set_error(std::exchange(nextError, {})); }); } template <typename Func> void handle_signal(Func deliverSignalTo) noexcept { auto& strm = stream_; strm.nextOp_.destruct(); auto oldState = strm.state_.load(std::memory_order_acquire); if (oldState == state::source_next_active) { if (strm.state_.compare_exchange_strong( oldState, state::source_next_completed, std::memory_order_relaxed)) { // We acquired ownership of the receiver before it was cancelled. auto* receiver = std::exchange(strm.nextReceiver_, nullptr); UNIFEX_ASSERT(receiver != nullptr); deliverSignalTo(receiver); return; } } if (oldState == state::source_next_active_stream_stopped) { if (strm.state_.compare_exchange_strong( oldState, state::source_next_completed, std::memory_order_release, std::memory_order_acquire)) { // Successfully signalled that 'next' completed before 'cleanup' // operation started. Discard this signal without forwarding it on. return; } } // Otherwise, cleanup() was requested before this operation completed. // We are responsible for starting cleanup now that next() has finished. UNIFEX_ASSERT(oldState == state::source_next_active_cleanup_requested); UNIFEX_ASSERT(stream_.cleanupOp_ != nullptr); stream_.cleanupOp_->start_cleanup(); } friend inplace_stop_token tag_invoke( tag_t<get_stop_token>, const next_receiver& r) noexcept { return r.get_stop_source().get_token(); } template <typename Func> friend void tag_invoke( tag_t<visit_continuations>, const next_receiver& r, Func&& func) { std::invoke(func, r.op_->receiver_); } }; struct next_sender { stream& stream_; template <template <typename...> class Variant, template <typename...> class Tuple> using value_types = sender_value_types_t<next_sender_t<SourceStream>, Variant, Tuple>; template <template <typename...> class Variant> using error_types = sender_error_types_t<next_sender_t<SourceStream>, Variant>; static constexpr bool sends_done = true; template <typename Receiver> struct _op { struct type { struct concrete_receiver final : next_receiver_base { type& op_; explicit concrete_receiver(type& op) : op_(op) {} void set_value(Values&&... values) && noexcept final { op_.stopCallback_.destruct(); unifex::set_value(std::move(op_.receiver_), (Values&&)values...); } void set_done() && noexcept final { op_.stopCallback_.destruct(); unifex::set_done(std::move(op_.receiver_)); } void set_error(std::exception_ptr ex) && noexcept final { op_.stopCallback_.destruct(); unifex::set_error(std::move(op_.receiver_), std::move(ex)); } }; using ST = stop_token_type_t<Receiver&>; stream& stream_; concrete_receiver concreteReceiver_; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime< typename ST::template callback_type<cancel_next_callback>> stopCallback_; template <typename Receiver2> explicit type(stream& strm, Receiver2&& receiver) : stream_(strm) , concreteReceiver_(*this) , receiver_{(Receiver2&&)receiver} {} void start() noexcept { auto stopToken = get_stop_token(receiver_); if (stopToken.stop_requested()) { unifex::set_done(std::move(receiver_)); return; } static_assert( std::is_same_v<decltype(stopToken), ST>); UNIFEX_TRY { stream_.nextOp_.construct_with([&] { return unifex::connect( next(stream_.source_), next_receiver{stream_}); }); stream_.nextReceiver_ = &concreteReceiver_; stream_.state_.store( state::source_next_active, std::memory_order_relaxed); UNIFEX_TRY { stopCallback_.construct( std::move(stopToken), cancel_next_callback{stream_}); unifex::start(stream_.nextOp_.get()); } UNIFEX_CATCH (...) { stream_.nextReceiver_ = nullptr; stream_.nextOp_.destruct(); stream_.state_.store( state::source_next_completed, std::memory_order_relaxed); unifex::set_error(std::move(receiver_), std::current_exception()); } } UNIFEX_CATCH (...) { stream_.state_.store( state::source_next_completed, std::memory_order_relaxed); unifex::set_error(std::move(receiver_), std::current_exception()); } } }; }; template <typename Receiver> using operation = typename _op<remove_cvref_t<Receiver>>::type; template <typename Receiver> operation<Receiver> connect(Receiver&& receiver) && { return operation<Receiver>{stream_, (Receiver&&)receiver}; } template <typename Receiver> void connect(Receiver&& receiver) const& =delete; }; struct cleanup_sender { stream& stream_; template <template <typename...> class Variant, template <typename...> class Tuple> using value_types = Variant<>; template <template <typename...> class Variant> using error_types = typename concat_type_lists_unique_t< sender_error_types_t<cleanup_sender_t<SourceStream>, type_list>, type_list<std::exception_ptr>>::template apply<Variant>; static constexpr bool sends_done = true; template <typename Receiver> struct _op { struct type final : cleanup_operation_base { struct receiver_wrapper { type& op_; void set_done() && noexcept { auto& op = op_; op.cleanupOp_.destruct(); if (op.stream_.nextError_) { unifex::set_error( std::move(op.receiver_), std::move(op.stream_.nextError_)); } else { unifex::set_done(std::move(op.receiver_)); } } template <typename Error> void set_error(Error&& error) && noexcept { auto& op = op_; op.cleanupOp_.destruct(); // Prefer sending the error from the next(source_) rather than // the error from cleanup(source_). if (op.stream_.nextError_) { unifex::set_error( std::move(op.receiver_), std::move(op.stream_.nextError_)); } else { unifex::set_error(std::move(op.receiver_), (Error&&)error); } } }; stream& stream_; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; manual_lifetime<cleanup_operation_t<SourceStream, receiver_wrapper>> cleanupOp_; template <typename Receiver2> explicit type(stream& strm, Receiver2&& receiver) : stream_(strm) , receiver_((Receiver2&&)receiver) {} void start() noexcept { auto oldState = stream_.state_.load(std::memory_order_acquire); if (oldState == state::source_next_active_stream_stopped) { stream_.cleanupOp_ = this; if (stream_.state_.compare_exchange_strong( oldState, state::source_next_active_cleanup_requested, std::memory_order_release, std::memory_order_acquire)) { // Successfully signalled that cleanup has been requested and // that the next() operation should call start_cleanup() when // it completes. return; } } // Otherwise, next() operation has completed so we are responsible // for starting if (oldState == state::source_next_completed) { // A prior next() call has been made on the underlying stream and // so we need to call cleanup(). start_cleanup(); return; } // No prior next() call has been made. Nothing to do for cleanup. // Send done() immediately. UNIFEX_ASSERT(oldState == state::not_started); unifex::set_done(std::move(receiver_)); } void start_cleanup() noexcept final { UNIFEX_TRY { cleanupOp_.construct_with([&] { return unifex::connect( cleanup(stream_.source_), receiver_wrapper{*this}); }); unifex::start(cleanupOp_.get()); } UNIFEX_CATCH (...) { // Prefer to send the error from next(source_) over the error // from cleanup(source_) if there was one. if (stream_.nextError_) { unifex::set_error(std::move(receiver_), std::move(stream_.nextError_)); } else { unifex::set_error(std::move(receiver_), std::current_exception()); } } } }; }; template <typename Receiver> using operation = typename _op<remove_cvref_t<Receiver>>::type; template <typename Receiver> operation<Receiver> connect(Receiver&& receiver) && { return operation<Receiver>{stream_, (Receiver &&) receiver}; } template <typename Receiver> void connect(Receiver&& receiver) const& = delete; }; UNIFEX_NO_UNIQUE_ADDRESS SourceStream source_; std::atomic<state> state_{state::not_started}; cleanup_operation_base* cleanupOp_ = nullptr; next_receiver_base* nextReceiver_ = nullptr; inplace_stop_source stopSource_; std::exception_ptr nextError_; manual_lifetime<next_operation_t<SourceStream, next_receiver>> nextOp_; public: template <typename SourceStream2> explicit type(SourceStream2&& source) : source_((SourceStream2&&)source) {} type(type&& other) : source_(std::move(other.source_)) {} friend next_sender tag_invoke(tag_t<next>, stream& s) { return {s}; } friend cleanup_sender tag_invoke(tag_t<cleanup>, stream& s) { return {s}; } }; } // namespace _stop_immediately namespace _stop_immediately_cpo { template <typename... Values> struct _fn { template <typename SourceStream> auto operator()(SourceStream&& source) const { return _stop_immediately::stream<SourceStream, Values...>{ (SourceStream &&) source}; } constexpr auto operator()() const noexcept(is_nothrow_callable_v< tag_t<bind_back>, _fn>) -> bind_back_result_t<_fn> { return bind_back(*this); } }; } // namespace _stop_immediately_cpo template <typename... Values> inline constexpr _stop_immediately_cpo::_fn<Values...> stop_immediately{}; } // namespace unifex #include <unifex/detail/epilogue.hpp>