in dispenso/for_each.h [55:128]
void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
// TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
// recursive case?
if (!n || (n == 1 && options.wait) || !options.maxThreads ||
detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
for (size_t i = 0; i < n; ++i) {
f(*start);
++start;
}
if (options.wait) {
tasks.wait();
}
return;
}
ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads(), options.maxThreads);
// Reduce threads used if they exceed work to be done.
numThreads = std::min<ssize_t>(numThreads, n);
auto chunking = detail::staticChunkSize(n, numThreads);
size_t chunkSize = chunking.ceilChunkSize;
bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
// (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
ssize_t t;
for (t = 0; t < firstLoopLen; ++t) {
Iter next = start;
std::advance(next, chunkSize);
tasks.schedule([start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
for (Iter it = start; it != next; ++it) {
f(*it);
}
});
start = next;
}
// Reduce the remaining chunk sizes by 1.
chunkSize -= !perfectlyChunked;
// Finish submitting all but the last item.
for (; t < numThreads - 1; ++t) {
Iter next = start;
std::advance(next, chunkSize);
tasks.schedule([start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
for (Iter it = start; it != next; ++it) {
f(*it);
}
});
start = next;
}
Iter end = start;
std::advance(end, chunkSize);
if (options.wait) {
for (Iter it = start; it != end; ++it) {
f(*it);
}
tasks.wait();
} else {
tasks.schedule(
[start, end, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
for (Iter it = start; it != end; ++it) {
f(*it);
}
},
ForceQueuingTag());
}
}