auto operator()

in include/unifex/find_if.hpp [147:247]


    auto operator()(
        Scheduler&& sched,
        const parallel_policy&,
        Iterator begin_it,
        Iterator end_it,
        Values&&... values) noexcept {
      // func_ is safe to run concurrently so let's make use of that

      // NOTE: Assumes random access iterator for now, on the assumption that the policy was accurate
      auto distance = std::distance(begin_it, end_it);
      using diff_t = decltype(distance);
      constexpr diff_t max_num_chunks = 32;
      constexpr diff_t min_chunk_size = 4;
      diff_t num_chunks = (distance/max_num_chunks) > min_chunk_size ?
        max_num_chunks : ((distance+min_chunk_size)/min_chunk_size);
      diff_t chunk_size = (distance+num_chunks)/num_chunks;

      // Found flag and vector that will be constructed in-place in the operation state
      struct State {
        std::atomic<bool> found_flag;
        std::vector<Iterator> perChunkState;
      };

      // The outer let_value keeps the vector of found results and the found flag
      // alive for the duration.
      // let_value_with constructs the vector and found_flag directly in the operation
      // state.
      // Use a two phase process largely to demonstrate a simple multi-phase algorithm
      // and to avoid using a cmpexch loop on an intermediate iterator.
      return
      unifex::let_value(
        unifex::just(std::forward<Values>(values)...),
        [func = std::move(func_), sched = std::move(sched), begin_it,
        chunk_size, end_it, num_chunks](Values&... values) mutable {
          return unifex::let_value_with([&](){return State{false, std::vector<Iterator>(num_chunks, end_it)};},[&](State& state) {
            // Inject a stop source and make it available for inner operations.
            // This stop source propagates into the algorithm through the receiver,
            // such that it will cancel the bulk_schedule operation.
            // It is also triggered if the downstream stop source is triggered.
            return unifex::let_value_with_stop_source([&](unifex::inplace_stop_source& stopSource) mutable {
              auto bulk_phase = unifex::bulk_join(
                  unifex::bulk_transform(
                    unifex::bulk_schedule(std::move(sched), num_chunks),
                    [&](diff_t index){
                      auto chunk_begin_it = begin_it + (chunk_size*index);
                      auto chunk_end_it = chunk_begin_it;
                      if(index < (num_chunks-1)) {
                        std::advance(chunk_end_it, chunk_size);
                      } else {
                        chunk_end_it = end_it;
                      }

                      for(auto it = chunk_begin_it; it != chunk_end_it; ++it) {
                        if(std::invoke(func, *it, values...)) {
                          // On success, store the value in the output array
                          // and cancel future work.
                          // This works on the assumption that bulk_schedule will launch
                          // tasks (or at least, test for cancellation) in
                          // iteration-space order, and hence only cancel future work,
                          // to maintain the find-first property.
                          state.perChunkState[index] = it;
                          state.found_flag = true;
                          stopSource.request_stop();
                          return;
                        }
                      }
                    },
                    unifex::par
                  )
                );
              return
                unifex::then(
                  unifex::let_done(
                    std::move(bulk_phase),
                    [&state](){
                      if(state.found_flag == true) {
                        // If the item was found, then continue as if not cancelled
                        return just();
                      } else {
                        // If there was cancellation and we did not find the item
                        // then propagate the cancellation and assume failure
                        // TODO: We are temporarily always recovering from cancellation
                        // until a variant sender is implemented to unify the two
                        // algorithms
                        return just();
                      }
                    }
                  ),
                  [&state, end_it, &values...]() mutable -> std::tuple<Iterator, Values...> {
                    for(auto it : state.perChunkState) {
                      if(it != end_it) {
                        return std::tuple<Iterator, Values...>(it, std::move(values)...);
                      }
                    }
                    return std::tuple<Iterator, Values...>(end_it, std::move(values)...);
                  }
                );
              });
            });
          });
    }