in dispenso/parallel_for.h [434:532]
void parallel_for(
TaskSetT& taskSet,
StateContainer& states,
const StateGen& defaultState,
const ChunkedRange<IntegerT>& range,
F&& f,
ParForOptions options = {}) {
if (range.empty()) {
if (options.wait) {
taskSet.wait();
}
return;
}
if (!options.maxThreads || range.size() == 1 ||
detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
states.emplace_back(defaultState());
f(*states.begin(), range.start, range.end);
if (options.wait) {
taskSet.wait();
}
return;
}
if (range.isStatic()) {
detail::parallel_for_staticImpl(
taskSet, states, defaultState, range, std::forward<F>(f), options);
return;
}
const ssize_t N = taskSet.numPoolThreads();
const bool useCallingThread = options.wait;
const ssize_t numToLaunch = std::min<ssize_t>(options.maxThreads, N - useCallingThread);
for (ssize_t i = 0; i < numToLaunch + useCallingThread; ++i) {
states.emplace_back(defaultState());
}
if (numToLaunch == 1 && !useCallingThread) {
taskSet.schedule(
[&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
if (options.wait) {
taskSet.wait();
}
return;
} else if (numToLaunch == 0) {
f(*states.begin(), range.start, range.end);
if (options.wait) {
taskSet.wait();
}
return;
}
const IntegerT chunk = range.calcChunkSize(numToLaunch, useCallingThread);
if (options.wait) {
alignas(kCacheLineSize) std::atomic<IntegerT> index(range.start);
auto worker = [end = range.end, &index, f, chunk](auto& s) {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
while (true) {
IntegerT cur = index.fetch_add(chunk, std::memory_order_relaxed);
if (cur >= end) {
break;
}
f(s, cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
}
};
auto it = states.begin();
for (ssize_t i = 0; i < numToLaunch; ++i) {
taskSet.schedule([&s = *it++, worker]() { worker(s); });
}
worker(*it);
taskSet.wait();
} else {
struct Atomic {
Atomic(IntegerT i) : index(i) {}
char buffer[kCacheLineSize];
std::atomic<IntegerT> index;
char buffer2[kCacheLineSize];
};
auto wrapper = std::make_shared<Atomic>(range.start);
auto worker = [end = range.end, wrapper = std::move(wrapper), f, chunk](auto& s) {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
while (true) {
IntegerT cur = wrapper->index.fetch_add(chunk, std::memory_order_relaxed);
if (cur >= end) {
break;
}
f(s, cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
}
};
auto it = states.begin();
for (ssize_t i = 0; i < numToLaunch; ++i) {
taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
}
}
}