include/unifex/find_if.hpp (338 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unifex/config.hpp> #include <unifex/just.hpp> #include <unifex/let_value_with.hpp> #include <unifex/let_value_with_stop_source.hpp> #include <unifex/execution_policy.hpp> #include <unifex/receiver_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/stream_concepts.hpp> #include <unifex/type_traits.hpp> #include <unifex/blocking.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/async_trace.hpp> #include <unifex/then.hpp> #include <unifex/let_done.hpp> #include <unifex/type_list.hpp> #include <unifex/std_concepts.hpp> #include <unifex/bulk_join.hpp> #include <unifex/bulk_transform.hpp> #include <unifex/bulk_schedule.hpp> #include <unifex/bind_back.hpp> #include <exception> #include <functional> #include <type_traits> #include <utility> #include <memory> #include <unifex/detail/prologue.hpp> namespace unifex { namespace _find_if { template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> struct _receiver { struct type; }; template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> using receiver_t = typename _receiver<Predecessor, Receiver, Func, FuncPolicy>::type; template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> struct _operation_state; template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> struct _receiver<Predecessor, Receiver, Func, FuncPolicy>::type { UNIFEX_NO_UNIQUE_ADDRESS Func func_; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; // NO_UNIQUE_ADDRESS here triggers what appears to be a layout bug /*UNIFEX_NO_UNIQUE_ADDRESS*/ FuncPolicy funcPolicy_; _operation_state<Predecessor, Receiver, Func, FuncPolicy>& operation_state_; // Helper receiver type to unpack a tuple template<typename OutputReceiver> struct unpack_receiver { OutputReceiver output_receiver_; _operation_state<Predecessor, Receiver, Func, FuncPolicy>& operation_state_; template<typename Tuple, size_t... Idx> void unpack_helper(OutputReceiver&& output_receiver, Tuple&& t, std::index_sequence<Idx...>) { unifex::set_value( (OutputReceiver&&) output_receiver, std::move(std::get<Idx>(t))...); } template <typename Iterator, typename... Values> void set_value(std::tuple<Iterator, Values...>&& packedResult) && noexcept { operation_state_.cleanup(); UNIFEX_TRY { unpack_helper( (OutputReceiver&&) output_receiver_, std::move(packedResult), std::make_index_sequence<std::tuple_size_v<std::tuple<Iterator, Values...>>>{}); } UNIFEX_CATCH(...) { unifex::set_error((OutputReceiver&&)output_receiver_, std::current_exception()); } } template <typename Error> void set_error(Error&& error) && noexcept { operation_state_.cleanup(); unifex::set_error((OutputReceiver &&) output_receiver_, (Error &&) error); } void set_done() && noexcept { operation_state_.cleanup(); unifex::set_done((OutputReceiver &&) output_receiver_); } template(typename CPO, typename R) (requires is_receiver_query_cpo_v<CPO> AND same_as<R, unpack_receiver<OutputReceiver>>) friend auto tag_invoke(CPO cpo, const R& r) noexcept( is_nothrow_callable_v<CPO, const OutputReceiver&>) -> callable_result_t<CPO, const OutputReceiver&> { return std::move(cpo)(std::as_const(r.output_receiver_)); } }; struct find_if_helper { Func func_; template<typename Scheduler, typename Iterator, typename... Values> auto operator()( Scheduler&& /*unused*/, const sequenced_policy&, Iterator begin_it, Iterator end_it, Values&&... values) noexcept { // Sequential implementation return unifex::then( unifex::just(std::forward<Values>(values)...), [this, begin_it, end_it](auto... values) { for(auto it = begin_it; it != end_it; ++it) { if(std::invoke((Func &&) func_, *it, values...)) { return std::tuple<Iterator, Values...>(it, std::move(values)...); } } return std::tuple<Iterator, Values...>(end_it, std::move(values)...); } ); } // Cancellable parallel algorithm. // This version is two phase to avoid a non-trivial atomic in the middle. // With more built-in algorithms it can be simplified: // * let_value_with to allocate non-movable state in the operation state. // * unpack to deal with tuple to pack conversion // It could also be simplified by making more of the code custom, but I wanted // to demonstrate reuse of internal algorithms to build something more compelex // and cancellable. template<typename Scheduler, typename Iterator, typename... Values> auto operator()( Scheduler&& sched, const parallel_policy&, Iterator begin_it, Iterator end_it, Values&&... values) noexcept { // func_ is safe to run concurrently so let's make use of that // NOTE: Assumes random access iterator for now, on the assumption that the policy was accurate auto distance = std::distance(begin_it, end_it); using diff_t = decltype(distance); constexpr diff_t max_num_chunks = 32; constexpr diff_t min_chunk_size = 4; diff_t num_chunks = (distance/max_num_chunks) > min_chunk_size ? max_num_chunks : ((distance+min_chunk_size)/min_chunk_size); diff_t chunk_size = (distance+num_chunks)/num_chunks; // Found flag and vector that will be constructed in-place in the operation state struct State { std::atomic<bool> found_flag; std::vector<Iterator> perChunkState; }; // The outer let_value keeps the vector of found results and the found flag // alive for the duration. // let_value_with constructs the vector and found_flag directly in the operation // state. // Use a two phase process largely to demonstrate a simple multi-phase algorithm // and to avoid using a cmpexch loop on an intermediate iterator. return unifex::let_value( unifex::just(std::forward<Values>(values)...), [func = std::move(func_), sched = std::move(sched), begin_it, chunk_size, end_it, num_chunks](Values&... values) mutable { return unifex::let_value_with([&](){return State{false, std::vector<Iterator>(num_chunks, end_it)};},[&](State& state) { // Inject a stop source and make it available for inner operations. // This stop source propagates into the algorithm through the receiver, // such that it will cancel the bulk_schedule operation. // It is also triggered if the downstream stop source is triggered. return unifex::let_value_with_stop_source([&](unifex::inplace_stop_source& stopSource) mutable { auto bulk_phase = unifex::bulk_join( unifex::bulk_transform( unifex::bulk_schedule(std::move(sched), num_chunks), [&](diff_t index){ auto chunk_begin_it = begin_it + (chunk_size*index); auto chunk_end_it = chunk_begin_it; if(index < (num_chunks-1)) { std::advance(chunk_end_it, chunk_size); } else { chunk_end_it = end_it; } for(auto it = chunk_begin_it; it != chunk_end_it; ++it) { if(std::invoke(func, *it, values...)) { // On success, store the value in the output array // and cancel future work. // This works on the assumption that bulk_schedule will launch // tasks (or at least, test for cancellation) in // iteration-space order, and hence only cancel future work, // to maintain the find-first property. state.perChunkState[index] = it; state.found_flag = true; stopSource.request_stop(); return; } } }, unifex::par ) ); return unifex::then( unifex::let_done( std::move(bulk_phase), [&state](){ if(state.found_flag == true) { // If the item was found, then continue as if not cancelled return just(); } else { // If there was cancellation and we did not find the item // then propagate the cancellation and assume failure // TODO: We are temporarily always recovering from cancellation // until a variant sender is implemented to unify the two // algorithms return just(); } } ), [&state, end_it, &values...]() mutable -> std::tuple<Iterator, Values...> { for(auto it : state.perChunkState) { if(it != end_it) { return std::tuple<Iterator, Values...>(it, std::move(values)...); } } return std::tuple<Iterator, Values...>(end_it, std::move(values)...); } ); }); }); }); } }; template <typename Iterator, typename... Values> void set_value(Iterator begin_it, Iterator end_it, Values&&... values) && noexcept; template <typename Error> void set_error(Error&& error) && noexcept { unifex::set_error((Receiver &&) receiver_, (Error &&) error); } void set_done() && noexcept { unifex::set_done((Receiver &&) receiver_); } template(typename CPO, typename R) (requires is_receiver_query_cpo_v<CPO> AND same_as<R, type>) friend auto tag_invoke(CPO cpo, const R& r) noexcept( is_nothrow_callable_v<CPO, const Receiver&>) -> callable_result_t<CPO, const Receiver&> { return std::move(cpo)(std::as_const(r.receiver_)); } template <typename Visit> friend void tag_invoke(tag_t<visit_continuations>, const type& r, Visit&& visit) { std::invoke(visit, r.receiver_); } }; template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> struct _operation_state { using receiver_type = receiver_t<Predecessor, Receiver, Func, FuncPolicy>; template <typename... Ts> using find_if_apply_t = connect_result_t< callable_result_t< typename receiver_type::find_if_helper, std::decay_t<get_scheduler_result_t<const Receiver&>>&&, FuncPolicy, Ts&&...>, typename receiver_type::template unpack_receiver<Receiver>>; using operation_state_t = typename sender_value_types_t< Predecessor, single_overload, find_if_apply_t>::type; template<typename Sender> _operation_state(Sender&& s, Receiver&& r) : predOp_{unifex::connect( static_cast<Sender&&>(s).pred_, receiver_type{ static_cast<Sender&&>(s).func_, static_cast<Receiver&&>(r), static_cast<Sender&&>(s).funcPolicy_, *this })} { } ~_operation_state() noexcept { } void start() noexcept { unifex::start(predOp_); } void startInner() noexcept { started_ = true; unifex::start(innerOp_.get()); } void cleanup() noexcept { innerOp_.destruct(); } connect_result_t<Predecessor, receiver_type> predOp_; manual_lifetime<operation_state_t> innerOp_; bool started_ = false; }; template <typename Predecessor, typename Receiver, typename Func, typename FuncPolicy> template <typename Iterator, typename... Values> void _receiver<Predecessor, Receiver, Func, FuncPolicy>::type::set_value( Iterator begin_it, Iterator end_it, Values&&... values) && noexcept { auto sched = unifex::get_scheduler(receiver_); unpack_receiver<Receiver> unpack{(Receiver &&) receiver_, operation_state_}; UNIFEX_TRY { auto find_if_implementation_sender = find_if_helper{std::move(func_)}( std::move(sched), funcPolicy_, begin_it, end_it, (Values&&) values...); // Store nested operation state inside find_if's operation state operation_state_.innerOp_.construct_with([&]() mutable { return unifex::connect(std::move(find_if_implementation_sender), std::move(unpack)); }); operation_state_.startInner(); } UNIFEX_CATCH(...) { unifex::set_error(std::move(unpack), std::current_exception()); } } template <typename Predecessor, typename Func, typename FuncPolicy> struct _sender { struct type; }; template <typename Predecessor, typename Func, typename FuncPolicy> using sender_t = typename _sender<Predecessor, Func, FuncPolicy>::type; template <typename Predecessor, typename Func, typename FuncPolicy> struct _sender<Predecessor, Func, FuncPolicy>::type { UNIFEX_NO_UNIQUE_ADDRESS Predecessor pred_; UNIFEX_NO_UNIQUE_ADDRESS Func func_; UNIFEX_NO_UNIQUE_ADDRESS FuncPolicy funcPolicy_; template <typename BeginIt, typename EndIt, typename... Args> using result = type_list<type_list<BeginIt, Args...>>; template < template <typename...> class Variant, template <typename...> class Tuple> using value_types = type_list_nested_apply_t< sender_value_types_t<Predecessor, concat_type_lists_unique_t, result>, Variant, Tuple>; template <template <typename...> class Variant> using error_types = typename concat_type_lists_unique_t< sender_error_types_t<Predecessor, type_list>, type_list<std::exception_ptr>>::template apply<Variant>; static constexpr bool sends_done = true; template <typename Receiver> using receiver_type = receiver_t<Predecessor, Receiver, Func, FuncPolicy>; friend constexpr auto tag_invoke(tag_t<blocking>, const type& sender) { return blocking(sender.pred_); } template(typename Sender, typename Receiver) (requires same_as<remove_cvref_t<Sender>, type> AND receiver<Receiver>) friend auto tag_invoke(tag_t<unifex::connect>, Sender&& s, Receiver&& r) noexcept( std::is_nothrow_constructible_v<remove_cvref_t<Receiver>, Receiver> && std::is_nothrow_constructible_v<Func, member_t<Sender, Func>> && is_nothrow_connectable_v< member_t<Sender, Predecessor>, receiver_type<remove_cvref_t<Receiver>>>) -> _operation_state<Predecessor, Receiver, Func, FuncPolicy> { return _operation_state<Predecessor, Receiver, Func, FuncPolicy>{ static_cast<Sender&&>(s), static_cast<Receiver&&>(r)}; } }; } // namespace _find_if namespace _find_if_cpo { inline const struct _fn { public: template(typename Sender, typename Func, typename FuncPolicy) (requires tag_invocable<_fn, Sender, Func, FuncPolicy>) auto operator()(Sender&& predecessor, Func&& func, FuncPolicy policy) const noexcept(is_nothrow_tag_invocable_v<_fn, Sender, Func, FuncPolicy>) -> tag_invoke_result_t<_fn, Sender, Func, FuncPolicy> { return unifex::tag_invoke(_fn{}, (Sender&&)predecessor, (Func&&)func, (FuncPolicy&&)policy); } template(typename Sender, typename Func, typename FuncPolicy) (requires (!tag_invocable<_fn, Sender, Func, FuncPolicy>)) auto operator()(Sender&& predecessor, Func&& func, FuncPolicy policy) const noexcept( std::is_nothrow_constructible_v<remove_cvref_t<Sender>, Sender> && std::is_nothrow_constructible_v<remove_cvref_t<Func>, Func> && std::is_nothrow_constructible_v<remove_cvref_t<FuncPolicy>, FuncPolicy>) -> _find_if::sender_t<remove_cvref_t<Sender>, std::decay_t<Func>, FuncPolicy>{ return _find_if::sender_t<remove_cvref_t<Sender>, std::decay_t<Func>, FuncPolicy>{ (Sender &&) predecessor, (Func &&) func, (FuncPolicy &&) policy}; } template <typename Func, typename FuncPolicy> constexpr auto operator()(Func&& f, const FuncPolicy& policy) const noexcept(is_nothrow_callable_v< tag_t<bind_back>, _fn, Func, const FuncPolicy&>) -> bind_back_result_t<_fn, Func, const FuncPolicy&> { return bind_back(*this, (Func&&)f, policy); } } find_if{}; } // namespace _find_if_cpo using _find_if_cpo::find_if; } // namespace unifex #include <unifex/detail/epilogue.hpp>