in dispenso/parallel_for.h [229:287]
void parallel_for_staticImpl(
TaskSetT& taskSet,
StateContainer& states,
const StateGen& defaultState,
const ChunkedRange<IntegerT>& range,
F&& f,
ParForOptions options) {
ssize_t numThreads = std::min<ssize_t>(taskSet.numPoolThreads(), options.maxThreads);
// Reduce threads used if they exceed work to be done.
numThreads = std::min<ssize_t>(numThreads, range.size());
for (ssize_t i = 0; i < numThreads; ++i) {
states.emplace_back(defaultState());
}
auto chunking = detail::staticChunkSize(range.size(), numThreads);
IntegerT chunkSize = static_cast<IntegerT>(chunking.ceilChunkSize);
bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
// (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
auto stateIt = states.begin();
IntegerT start = range.start;
ssize_t t;
for (t = 0; t < firstLoopLen; ++t) {
IntegerT next = static_cast<IntegerT>(start + chunkSize);
taskSet.schedule([it = stateIt++, start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(*it, start, next);
});
start = next;
}
// Reduce the remaining chunk sizes by 1.
chunkSize = static_cast<IntegerT>(chunkSize - !perfectlyChunked);
// Finish submitting all but the last item.
for (; t < numThreads - 1; ++t) {
IntegerT next = static_cast<IntegerT>(start + chunkSize);
taskSet.schedule([it = stateIt++, start, next, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(*it, start, next);
});
start = next;
}
if (options.wait) {
f(*stateIt, start, range.end);
taskSet.wait();
} else {
taskSet.schedule(
[stateIt, start, end = range.end, f]() {
auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
f(*stateIt, start, end);
},
ForceQueuingTag());
}
}