include/unifex/when_all.hpp (313 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unifex/async_trace.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/inplace_stop_token.hpp> #include <unifex/manual_lifetime.hpp> #include <unifex/receiver_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/type_traits.hpp> #include <unifex/type_list.hpp> #include <unifex/blocking.hpp> #include <unifex/std_concepts.hpp> #include <atomic> #include <cstddef> #include <optional> #include <tuple> #include <type_traits> #include <variant> #include <unifex/detail/prologue.hpp> namespace unifex { namespace _when_all { template < std::size_t Index, template <std::size_t> class Receiver, typename... Senders> struct _operation_tuple { struct type; }; template < std::size_t Index, template <std::size_t> class Receiver, typename... Senders> using operation_tuple = typename _operation_tuple<Index, Receiver, Senders...>::type; template < std::size_t Index, template <std::size_t> class Receiver, typename First, typename... Rest> struct _operation_tuple<Index, Receiver, First, Rest...> { struct type; }; template < std::size_t Index, template <std::size_t> class Receiver, typename First, typename... Rest> struct _operation_tuple<Index, Receiver, First, Rest...>::type : operation_tuple<Index + 1, Receiver, Rest...> { template <typename Parent> explicit type(Parent& parent, First&& first, Rest&&... rest) : operation_tuple<Index + 1, Receiver, Rest...>{parent, (Rest &&) rest...}, op_(connect((First &&) first, Receiver<Index>{parent})) {} void start() noexcept { unifex::start(op_); operation_tuple<Index + 1, Receiver, Rest...>::start(); } private: connect_result_t<First, Receiver<Index>> op_; }; template <std::size_t Index, template <std::size_t> class Receiver> struct _operation_tuple<Index, Receiver> { struct type; }; template <std::size_t Index, template <std::size_t> class Receiver> struct _operation_tuple<Index, Receiver>::type { template <typename Parent> explicit type(Parent&) noexcept {} void start() noexcept {} }; struct cancel_operation { inplace_stop_source& stopSource_; void operator()() noexcept { stopSource_.request_stop(); } }; template <typename Receiver, typename... Senders> struct _op { struct type; }; template <typename Receiver, typename... Senders> using operation = typename _op<remove_cvref_t<Receiver>, Senders...>::type; template <typename... Errors> using unique_decayed_error_types = concat_type_lists_unique_t< type_list<std::decay_t<Errors>>...>; template <template <typename...> class Variant, typename... Senders> using error_types = typename concat_type_lists_unique_t< sender_error_types_t<Senders, unique_decayed_error_types>..., type_list<std::exception_ptr>>::template apply<Variant>; template <typename... Values> using decayed_value_tuple = type_list<std::tuple<std::decay_t<Values>...>>; template <typename Sender> using value_variant_for_sender = typename sender_value_types_t<Sender, concat_type_lists_unique_t, decayed_value_tuple> ::template apply<std::variant>; template <size_t Index, typename Receiver, typename... Senders> struct _element_receiver { struct type; }; template <size_t Index, typename Receiver, typename... Senders> using element_receiver = typename _element_receiver<Index, Receiver, Senders...>::type; template <size_t Index, typename Receiver, typename... Senders> struct _element_receiver<Index, Receiver, Senders...>::type final { using element_receiver = type; operation<Receiver, Senders...>& op_; template <typename... Values> void set_value(Values&&... values) noexcept { UNIFEX_TRY { std::get<Index>(op_.values_) .emplace( std::in_place_type<std::tuple<std::decay_t<Values>...>>, (Values &&) values...); op_.element_complete(); } UNIFEX_CATCH (...) { this->set_error(std::current_exception()); } } template <typename Error> void set_error(Error&& error) noexcept { if (!op_.doneOrError_.exchange(true, std::memory_order_relaxed)) { op_.error_.emplace(std::in_place_type<std::decay_t<Error>>, (Error &&) error); op_.stopSource_.request_stop(); } op_.element_complete(); } void set_done() noexcept { if (!op_.doneOrError_.exchange(true, std::memory_order_relaxed)) { op_.stopSource_.request_stop(); } op_.element_complete(); } Receiver& get_receiver() const { return op_.receiver_; } template(typename CPO, typename R) (requires is_receiver_query_cpo_v<CPO> AND same_as<R, element_receiver> AND is_callable_v<CPO, const Receiver&>) friend auto tag_invoke(CPO cpo, const R& r) noexcept( is_nothrow_callable_v<CPO, const Receiver&>) -> callable_result_t<CPO, const Receiver&> { return std::move(cpo)(std::as_const(r.get_receiver())); } inplace_stop_source& get_stop_source() const { return op_.stopSource_; } friend inplace_stop_token tag_invoke( tag_t<get_stop_token>, const element_receiver& r) noexcept { return r.get_stop_source().get_token(); } template <typename Func> friend void tag_invoke( tag_t<visit_continuations>, const element_receiver& r, Func&& func) { std::invoke(func, r.get_receiver()); } }; template <typename Receiver, typename... Senders> struct _op<Receiver, Senders...>::type { using operation = type; using receiver_type = Receiver; template <std::size_t Index, typename Receiver2, typename... Senders2> friend struct _element_receiver; explicit type(Receiver&& receiver, Senders&&... senders) : receiver_((Receiver &&) receiver), ops_(*this, (Senders &&) senders...) {} void start() noexcept { stopCallback_.construct( get_stop_token(receiver_), cancel_operation{stopSource_}); ops_.start(); } private: void element_complete() noexcept { if (refCount_.fetch_sub(1, std::memory_order_acq_rel) == 1) { deliver_result(); } } void deliver_result() noexcept { stopCallback_.destruct(); if (get_stop_token(receiver_).stop_requested()) { unifex::set_done(std::move(receiver_)); } else if (doneOrError_.load(std::memory_order_relaxed)) { if (error_.has_value()) { std::visit( [this](auto&& error) { unifex::set_error(std::move(receiver_), (decltype(error))error); }, std::move(error_.value())); } else { unifex::set_done(std::move(receiver_)); } } else { deliver_value(std::index_sequence_for<Senders...>{}); } } template <std::size_t... Indices> void deliver_value(std::index_sequence<Indices...>) noexcept { UNIFEX_TRY { unifex::set_value( std::move(receiver_), std::get<Indices>(std::move(values_)).value()...); } UNIFEX_CATCH (...) { unifex::set_error(std::move(receiver_), std::current_exception()); } } std::tuple<std::optional<value_variant_for_sender<remove_cvref_t<Senders>>>...> values_; std::optional<error_types<std::variant, remove_cvref_t<Senders>...>> error_; std::atomic<std::size_t> refCount_{sizeof...(Senders)}; std::atomic<bool> doneOrError_{false}; inplace_stop_source stopSource_; UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t< Receiver&>::template callback_type<cancel_operation>> stopCallback_; Receiver receiver_; template <std::size_t Index> using op_element_receiver = element_receiver<Index, Receiver, Senders...>; operation_tuple<0, op_element_receiver, Senders...> ops_; }; template <typename... Senders> struct _sender { class type; }; template <typename... Senders> using sender = typename _sender<remove_cvref_t<Senders>...>::type; template <typename Receiver, typename Indices, typename... Senders> extern const bool _when_all_connectable_v; template <typename Receiver, std::size_t... Indices, typename... Senders> inline constexpr bool _when_all_connectable_v<Receiver, std::index_sequence<Indices...>, Senders...> = (sender_to<Senders, element_receiver<Indices, Receiver, Senders...>> &&...); template <typename Receiver, typename... Senders> inline constexpr bool when_all_connectable_v = _when_all_connectable_v<Receiver, std::index_sequence_for<Senders...>, Senders...>; template <typename... Senders> class _sender<Senders...>::type { using sender = type; public: static_assert(sizeof...(Senders) > 0); template < template <typename...> class Variant, template <typename...> class Tuple> using value_types = Variant<Tuple<value_variant_for_sender<Senders>...>>; template <template <typename...> class Variant> using error_types = error_types<Variant, Senders...>; static constexpr bool sends_done = true; template <typename... Senders2> explicit type(Senders2&&... senders) : senders_((Senders2 &&) senders...) {} template(typename CPO, typename Sender, typename Receiver) (requires same_as<CPO, tag_t<unifex::connect>> AND same_as<remove_cvref_t<Sender>, type> AND when_all_connectable_v<remove_cvref_t<Receiver>, member_t<Sender, Senders>...>) friend auto tag_invoke([[maybe_unused]] CPO cpo, Sender&& sender, Receiver&& receiver) -> operation<Receiver, member_t<Sender, Senders>...> { return std::apply([&](Senders&&... senders) { return operation<Receiver, member_t<Sender, Senders>...>{ (Receiver &&) receiver, (Senders &&) senders...}; }, static_cast<Sender &&>(sender).senders_); } private: // Customise the 'blocking' CPO to combine the blocking-nature // of each of the child operations. friend blocking_kind tag_invoke(tag_t<blocking>, const sender& s) noexcept { bool alwaysInline = true; bool alwaysBlocking = true; bool neverBlocking = false; auto handleBlockingStatus = [&](blocking_kind kind) noexcept { switch (kind) { case blocking_kind::never: neverBlocking = true; [[fallthrough]]; case blocking_kind::maybe: alwaysBlocking = false; [[fallthrough]]; case blocking_kind::always: alwaysInline = false; [[fallthrough]]; case blocking_kind::always_inline: break; } }; std::apply([&](const auto&... senders) { (void)std::initializer_list<int>{ (handleBlockingStatus(blocking(senders)), 0)... }; }, s.senders_); if (neverBlocking) { return blocking_kind::never; } else if (alwaysInline) { return blocking_kind::always_inline; } else if (alwaysBlocking) { return blocking_kind::always; } else { return blocking_kind::maybe; } } std::tuple<Senders...> senders_; }; namespace _cpo { struct _fn { template (typename... Senders) (requires (unifex::sender<Senders> &&...) AND tag_invocable<_fn, Senders...>) auto operator()(Senders&&... senders) const -> tag_invoke_result_t<_fn, Senders...> { return tag_invoke(*this, (Senders &&) senders...); } template (typename... Senders) (requires (typed_sender<Senders> &&...) AND (!tag_invocable<_fn, Senders...>)) auto operator()(Senders&&... senders) const -> _when_all::sender<Senders...> { return _when_all::sender<Senders...>{(Senders &&) senders...}; } }; } // namespace _cpo } // namespace _when_all inline constexpr _when_all::_cpo::_fn when_all{}; } // namespace unifex #include <unifex/detail/epilogue.hpp>