void for_each_n()

in dispenso/for_each.h [55:128]


void for_each_n(TaskSetT& tasks, Iter start, size_t n, F&& f, ForEachOptions options = {}) {
  // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
  // recursive case?
  if (!n || (n == 1 && options.wait) || !options.maxThreads ||
      detail::PerPoolPerThreadInfo::isParForRecursive(&tasks.pool())) {
    for (size_t i = 0; i < n; ++i) {
      f(*start);
      ++start;
    }
    if (options.wait) {
      tasks.wait();
    }
    return;
  }

  ssize_t numThreads = std::min<ssize_t>(tasks.numPoolThreads(), options.maxThreads);
  // Reduce threads used if they exceed work to be done.
  numThreads = std::min<ssize_t>(numThreads, n);

  auto chunking = detail::staticChunkSize(n, numThreads);
  size_t chunkSize = chunking.ceilChunkSize;

  bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;

  // (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
  ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;

  ssize_t t;
  for (t = 0; t < firstLoopLen; ++t) {
    Iter next = start;
    std::advance(next, chunkSize);
    tasks.schedule([start, next, f]() {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
      for (Iter it = start; it != next; ++it) {
        f(*it);
      }
    });
    start = next;
  }

  // Reduce the remaining chunk sizes by 1.
  chunkSize -= !perfectlyChunked;
  // Finish submitting all but the last item.
  for (; t < numThreads - 1; ++t) {
    Iter next = start;
    std::advance(next, chunkSize);
    tasks.schedule([start, next, f]() {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
      for (Iter it = start; it != next; ++it) {
        f(*it);
      }
    });
    start = next;
  }

  Iter end = start;
  std::advance(end, chunkSize);

  if (options.wait) {
    for (Iter it = start; it != end; ++it) {
      f(*it);
    }
    tasks.wait();
  } else {
    tasks.schedule(
        [start, end, f]() {
          auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
          for (Iter it = start; it != end; ++it) {
            f(*it);
          }
        },
        ForceQueuingTag());
  }
}