in dispenso/parallel_for.h [170:221]
void parallel_for_staticImpl(
TaskSetT& taskSet,
const ChunkedRange<IntegerT>& range,
F&& f,
ParForOptions options) {
ssize_t numThreads = std::min<ssize_t>(taskSet.numPoolThreads(), options.maxThreads);
// Reduce threads used if they exceed work to be done.
numThreads = std::min<ssize_t>(numThreads, range.size());
auto chunking = detail::staticChunkSize(range.size(), numThreads);
IntegerT chunkSize = static_cast<IntegerT>(chunking.ceilChunkSize);
bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
// (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
ssize_t firstLoopLen = static_cast<ssize_t>(chunking.transitionTaskIndex) - perfectlyChunked;
IntegerT start = range.start;
ssize_t t;
for (t = 0; t < firstLoopLen; ++t) {
IntegerT next = static_cast<IntegerT>(start + chunkSize);
taskSet.schedule([start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(start, next);
});
start = next;
}
// Reduce the remaining chunk sizes by 1.
chunkSize = static_cast<IntegerT>(chunkSize - !perfectlyChunked);
// Finish submitting all but the last item.
for (; t < numThreads - 1; ++t) {
IntegerT next = static_cast<IntegerT>(start + chunkSize);
taskSet.schedule([start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(start, next);
});
start = next;
}
if (options.wait) {
f(start, range.end);
taskSet.wait();
} else {
taskSet.schedule(
[start, end = range.end, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(start, end);
},
ForceQueuingTag());
}
}