include/unifex/bulk_schedule.hpp (167 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <unifex/scheduler_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/receiver_concepts.hpp> #include <unifex/tag_invoke.hpp> #include <unifex/get_execution_policy.hpp> #include <unifex/execution_policy.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/bind_back.hpp> #include <unifex/detail/prologue.hpp> namespace unifex { // Size of chunk used for cancellation allowing for some vectorisation constexpr size_t bulk_cancellation_chunk_size = 16; namespace _bulk_schedule { template<typename Integral, typename Receiver> struct _schedule_receiver { class type; }; template<typename Integral, typename Receiver> using schedule_receiver = typename _schedule_receiver<Integral, Receiver>::type; template<typename Integral, typename Receiver> class _schedule_receiver<Integral, Receiver>::type { public: template<typename Receiver2> explicit type(Integral count, Receiver2&& r) : count_(std::move(count)) , receiver_((Receiver2&&)r) {} void set_value() noexcept(is_nothrow_receiver_of_v<Receiver> && is_nothrow_next_receiver_v<Receiver, Integral>) { using policy_t = decltype(get_execution_policy(receiver_)); auto stop_token = get_stop_token(receiver_); const bool stop_possible = !is_stop_never_possible_v<decltype(stop_token)> && stop_token.stop_possible(); if(stop_possible) { for (Integral chunk_start(0); chunk_start < count_; chunk_start += bulk_cancellation_chunk_size) { if(stop_token.stop_requested()) { unifex::set_done(std::move(receiver_)); return; } Integral chunk_end = std::min(chunk_start + static_cast<Integral>(bulk_cancellation_chunk_size), count_); if constexpr (is_one_of_v<policy_t, unsequenced_policy, parallel_unsequenced_policy>) { UNIFEX_DIAGNOSTIC_PUSH // Vectorisable version #if defined(__clang__) // When optimizing for size (e.g. with -Oz), Clang will not // vectorize this loop, and will emit a warning. There's // nothing to be done about the warning, though, so just // suppress it. #pragma clang diagnostic ignored "-Wpass-failed" #pragma clang loop vectorize(enable) interleave(enable) #elif defined(__GNUC__) #pragma GCC ivdep #elif defined(_MSC_VER) #pragma loop(ivdep) #endif for (Integral i(chunk_start); i < chunk_end; ++i) { unifex::set_next(receiver_, Integral(i)); } UNIFEX_DIAGNOSTIC_POP } else { // Sequenced version for (Integral i(chunk_start); i < chunk_end; ++i) { unifex::set_next(receiver_, Integral(i)); } } } } else { if constexpr (is_one_of_v<policy_t, unsequenced_policy, parallel_unsequenced_policy>) { UNIFEX_DIAGNOSTIC_PUSH // Vectorisable version #if defined(__clang__) // When optimizing for size (e.g. with -Oz), Clang will not // vectorize this loop, and will emit a warning. There's // nothing to be done about the warning, though, so just // suppress it. #pragma clang diagnostic ignored "-Wpass-failed" #pragma clang loop vectorize(enable) interleave(enable) #elif defined(__GNUC__) #pragma GCC ivdep #elif defined(_MSC_VER) #pragma loop(ivdep) #endif for (Integral i(0); i < count_; ++i) { unifex::set_next(receiver_, Integral(i)); } UNIFEX_DIAGNOSTIC_POP } else { // Sequenced version for (Integral i(0); i < count_; ++i) { unifex::set_next(receiver_, Integral(i)); } } } unifex::set_value(std::move(receiver_)); } template(typename Error) (requires receiver<Receiver, Error>) void set_error(Error&& e) noexcept { unifex::set_error(std::move(receiver_), (Error&&)e); } void set_done() noexcept { unifex::set_done(std::move(receiver_)); } private: Integral count_; Receiver receiver_; }; template<typename Scheduler, typename Integral> struct _default_sender { class type; }; template<typename Scheduler, typename Integral> using default_sender = typename _default_sender<Scheduler, Integral>::type; template<typename Scheduler, typename Integral> class _default_sender<Scheduler, Integral>::type { using schedule_sender_t = decltype(unifex::schedule(UNIFEX_DECLVAL(const Scheduler&))); public: template<template<typename...> class Variant, template<typename...> class Tuple> using value_types = Variant<Tuple<>>; template<template<typename...> class Variant, template<typename...> class Tuple> using next_types = Variant<Tuple<Integral>>; template<template<typename...> class Variant> using error_types = sender_error_types_t<schedule_sender_t, Variant>; static constexpr bool sends_done = true; template<typename Scheduler2> explicit type(Scheduler2&& s, Integral count) : scheduler_(static_cast<Scheduler2&&>(s)) , count_(std::move(count)) {} template(typename Self, typename BulkReceiver) (requires same_as<remove_cvref_t<Self>, type> AND receiver_of<BulkReceiver> AND is_next_receiver_v<BulkReceiver, Integral>) friend auto tag_invoke(tag_t<unifex::connect>, Self&& s, BulkReceiver&& r) { return unifex::connect( unifex::schedule(static_cast<Self&&>(s).scheduler_), schedule_receiver<Integral, remove_cvref_t<BulkReceiver>>{ static_cast<Self&&>(s).count_, static_cast<BulkReceiver&&>(r)}); } private: Scheduler scheduler_; Integral count_; }; struct _fn { template(typename Scheduler, typename Integral) (requires tag_invocable<_fn, Scheduler, Integral>) auto operator()(Scheduler&& s, Integral n) const noexcept(is_nothrow_tag_invocable_v<_fn, Scheduler, Integral>) -> tag_invoke_result_t<_fn, Scheduler, Integral> { return tag_invoke(_fn{}, (Scheduler&&)s, std::move(n)); } template(typename Scheduler, typename Integral) (requires scheduler<Scheduler> AND (!tag_invocable<_fn, Scheduler, Integral>)) auto operator()(Scheduler&& s, Integral n) const noexcept( std::is_nothrow_constructible_v<remove_cvref_t<Scheduler>, Scheduler> && std::is_nothrow_move_constructible_v<Integral>) -> default_sender<remove_cvref_t<Scheduler>, Integral> { return default_sender<remove_cvref_t<Scheduler>, Integral>{(Scheduler&&)s, std::move(n)}; } template <typename Integral> constexpr auto operator()(Integral n) const noexcept(is_nothrow_callable_v< tag_t<bind_back>, _fn, Integral>) -> bind_back_result_t<_fn, Integral> { return bind_back(*this, n); } }; } // namespace _bulk_schedule inline constexpr _bulk_schedule::_fn bulk_schedule{}; } // namespace unifex #include <unifex/detail/epilogue.hpp>