in dispenso/parallel_for.h [301:394]
void parallel_for(
TaskSetT& taskSet,
const ChunkedRange<IntegerT>& range,
F&& f,
ParForOptions options = {}) {
if (range.empty()) {
if (options.wait) {
taskSet.wait();
}
return;
}
// TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
// recursive case?
if (!options.maxThreads || range.size() == 1 ||
detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
f(range.start, range.end);
if (options.wait) {
taskSet.wait();
}
return;
}
if (range.isStatic()) {
detail::parallel_for_staticImpl(taskSet, range, std::forward<F>(f), options);
return;
}
const ssize_t N = taskSet.numPoolThreads();
const bool useCallingThread = options.wait;
const ssize_t numToLaunch = std::min<ssize_t>(options.maxThreads, N - useCallingThread);
if (numToLaunch == 1 && !useCallingThread) {
taskSet.schedule([range, f = std::move(f)]() { f(range.start, range.end); });
if (options.wait) {
taskSet.wait();
}
return;
} else if (numToLaunch == 0) {
f(range.start, range.end);
if (options.wait) {
taskSet.wait();
}
return;
}
const IntegerT chunk = range.calcChunkSize(numToLaunch, useCallingThread);
if (options.wait) {
alignas(kCacheLineSize) std::atomic<IntegerT> index(range.start);
auto worker = [end = range.end, &index, f = std::move(f), chunk]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
while (true) {
IntegerT cur = index.fetch_add(chunk, std::memory_order_relaxed);
if (cur >= end) {
break;
}
f(cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
}
};
using Function = decltype(worker);
for (ssize_t i = 0; i < numToLaunch; ++i) {
taskSet.schedule(Function(worker));
}
worker();
taskSet.wait();
} else {
struct Atomic {
Atomic(IntegerT i) : index(i) {}
char buffer[kCacheLineSize];
std::atomic<IntegerT> index;
char buffer2[kCacheLineSize];
};
// TODO(bbudge): dispenso::make_shared?
auto wrapper = std::make_shared<Atomic>(range.start);
auto worker = [end = range.end, wrapper = std::move(wrapper), f, chunk]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
while (true) {
IntegerT cur = wrapper->index.fetch_add(chunk, std::memory_order_relaxed);
if (cur >= end) {
break;
}
f(cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
}
};
using Function = decltype(worker);
for (ssize_t i = 0; i < numToLaunch; ++i) {
taskSet.schedule(Function(worker), ForceQueuingTag());
}
}
}