void parallel_for()

in dispenso/parallel_for.h [434:532]


void parallel_for(
    TaskSetT& taskSet,
    StateContainer& states,
    const StateGen& defaultState,
    const ChunkedRange<IntegerT>& range,
    F&& f,
    ParForOptions options = {}) {
  if (range.empty()) {
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }
  if (!options.maxThreads || range.size() == 1 ||
      detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
    states.emplace_back(defaultState());
    f(*states.begin(), range.start, range.end);
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }

  if (range.isStatic()) {
    detail::parallel_for_staticImpl(
        taskSet, states, defaultState, range, std::forward<F>(f), options);
    return;
  }

  const ssize_t N = taskSet.numPoolThreads();
  const bool useCallingThread = options.wait;
  const ssize_t numToLaunch = std::min<ssize_t>(options.maxThreads, N - useCallingThread);

  for (ssize_t i = 0; i < numToLaunch + useCallingThread; ++i) {
    states.emplace_back(defaultState());
  }

  if (numToLaunch == 1 && !useCallingThread) {
    taskSet.schedule(
        [&s = states.front(), range, f = std::move(f)]() { f(s, range.start, range.end); });
    if (options.wait) {
      taskSet.wait();
    }
    return;
  } else if (numToLaunch == 0) {
    f(*states.begin(), range.start, range.end);
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }

  const IntegerT chunk = range.calcChunkSize(numToLaunch, useCallingThread);

  if (options.wait) {
    alignas(kCacheLineSize) std::atomic<IntegerT> index(range.start);
    auto worker = [end = range.end, &index, f, chunk](auto& s) {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();

      while (true) {
        IntegerT cur = index.fetch_add(chunk, std::memory_order_relaxed);
        if (cur >= end) {
          break;
        }
        f(s, cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
      }
    };

    auto it = states.begin();
    for (ssize_t i = 0; i < numToLaunch; ++i) {
      taskSet.schedule([&s = *it++, worker]() { worker(s); });
    }
    worker(*it);
    taskSet.wait();
  } else {
    struct Atomic {
      Atomic(IntegerT i) : index(i) {}
      char buffer[kCacheLineSize];
      std::atomic<IntegerT> index;
      char buffer2[kCacheLineSize];
    };
    auto wrapper = std::make_shared<Atomic>(range.start);
    auto worker = [end = range.end, wrapper = std::move(wrapper), f, chunk](auto& s) {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
      while (true) {
        IntegerT cur = wrapper->index.fetch_add(chunk, std::memory_order_relaxed);
        if (cur >= end) {
          break;
        }
        f(s, cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
      }
    };

    auto it = states.begin();
    for (ssize_t i = 0; i < numToLaunch; ++i) {
      taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
    }
  }
}