in include/unifex/find_if.hpp [147:247]
auto operator()(
Scheduler&& sched,
const parallel_policy&,
Iterator begin_it,
Iterator end_it,
Values&&... values) noexcept {
// func_ is safe to run concurrently so let's make use of that
// NOTE: Assumes random access iterator for now, on the assumption that the policy was accurate
auto distance = std::distance(begin_it, end_it);
using diff_t = decltype(distance);
constexpr diff_t max_num_chunks = 32;
constexpr diff_t min_chunk_size = 4;
diff_t num_chunks = (distance/max_num_chunks) > min_chunk_size ?
max_num_chunks : ((distance+min_chunk_size)/min_chunk_size);
diff_t chunk_size = (distance+num_chunks)/num_chunks;
// Found flag and vector that will be constructed in-place in the operation state
struct State {
std::atomic<bool> found_flag;
std::vector<Iterator> perChunkState;
};
// The outer let_value keeps the vector of found results and the found flag
// alive for the duration.
// let_value_with constructs the vector and found_flag directly in the operation
// state.
// Use a two phase process largely to demonstrate a simple multi-phase algorithm
// and to avoid using a cmpexch loop on an intermediate iterator.
return
unifex::let_value(
unifex::just(std::forward<Values>(values)...),
[func = std::move(func_), sched = std::move(sched), begin_it,
chunk_size, end_it, num_chunks](Values&... values) mutable {
return unifex::let_value_with([&](){return State{false, std::vector<Iterator>(num_chunks, end_it)};},[&](State& state) {
// Inject a stop source and make it available for inner operations.
// This stop source propagates into the algorithm through the receiver,
// such that it will cancel the bulk_schedule operation.
// It is also triggered if the downstream stop source is triggered.
return unifex::let_value_with_stop_source([&](unifex::inplace_stop_source& stopSource) mutable {
auto bulk_phase = unifex::bulk_join(
unifex::bulk_transform(
unifex::bulk_schedule(std::move(sched), num_chunks),
[&](diff_t index){
auto chunk_begin_it = begin_it + (chunk_size*index);
auto chunk_end_it = chunk_begin_it;
if(index < (num_chunks-1)) {
std::advance(chunk_end_it, chunk_size);
} else {
chunk_end_it = end_it;
}
for(auto it = chunk_begin_it; it != chunk_end_it; ++it) {
if(std::invoke(func, *it, values...)) {
// On success, store the value in the output array
// and cancel future work.
// This works on the assumption that bulk_schedule will launch
// tasks (or at least, test for cancellation) in
// iteration-space order, and hence only cancel future work,
// to maintain the find-first property.
state.perChunkState[index] = it;
state.found_flag = true;
stopSource.request_stop();
return;
}
}
},
unifex::par
)
);
return
unifex::then(
unifex::let_done(
std::move(bulk_phase),
[&state](){
if(state.found_flag == true) {
// If the item was found, then continue as if not cancelled
return just();
} else {
// If there was cancellation and we did not find the item
// then propagate the cancellation and assume failure
// TODO: We are temporarily always recovering from cancellation
// until a variant sender is implemented to unify the two
// algorithms
return just();
}
}
),
[&state, end_it, &values...]() mutable -> std::tuple<Iterator, Values...> {
for(auto it : state.perChunkState) {
if(it != end_it) {
return std::tuple<Iterator, Values...>(it, std::move(values)...);
}
}
return std::tuple<Iterator, Values...>(end_it, std::move(values)...);
}
);
});
});
});
}