void parallel_for()

in dispenso/parallel_for.h [301:394]


void parallel_for(
    TaskSetT& taskSet,
    const ChunkedRange<IntegerT>& range,
    F&& f,
    ParForOptions options = {}) {
  if (range.empty()) {
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }
  // TODO(bbudge): With options.maxThreads, we might want to allow a small fanout factor in
  // recursive case?
  if (!options.maxThreads || range.size() == 1 ||
      detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
    f(range.start, range.end);
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }

  if (range.isStatic()) {
    detail::parallel_for_staticImpl(taskSet, range, std::forward<F>(f), options);
    return;
  }

  const ssize_t N = taskSet.numPoolThreads();
  const bool useCallingThread = options.wait;
  const ssize_t numToLaunch = std::min<ssize_t>(options.maxThreads, N - useCallingThread);

  if (numToLaunch == 1 && !useCallingThread) {
    taskSet.schedule([range, f = std::move(f)]() { f(range.start, range.end); });
    if (options.wait) {
      taskSet.wait();
    }
    return;
  } else if (numToLaunch == 0) {
    f(range.start, range.end);
    if (options.wait) {
      taskSet.wait();
    }
    return;
  }

  const IntegerT chunk = range.calcChunkSize(numToLaunch, useCallingThread);

  if (options.wait) {
    alignas(kCacheLineSize) std::atomic<IntegerT> index(range.start);
    auto worker = [end = range.end, &index, f = std::move(f), chunk]() {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
      while (true) {
        IntegerT cur = index.fetch_add(chunk, std::memory_order_relaxed);
        if (cur >= end) {
          break;
        }
        f(cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
      }
    };

    using Function = decltype(worker);

    for (ssize_t i = 0; i < numToLaunch; ++i) {
      taskSet.schedule(Function(worker));
    }
    worker();
    taskSet.wait();
  } else {
    struct Atomic {
      Atomic(IntegerT i) : index(i) {}
      char buffer[kCacheLineSize];
      std::atomic<IntegerT> index;
      char buffer2[kCacheLineSize];
    };
    // TODO(bbudge): dispenso::make_shared?
    auto wrapper = std::make_shared<Atomic>(range.start);
    auto worker = [end = range.end, wrapper = std::move(wrapper), f, chunk]() {
      auto recurseInfo = detail::PerPoolPerThreadInfo::parForRecurse();
      while (true) {
        IntegerT cur = wrapper->index.fetch_add(chunk, std::memory_order_relaxed);
        if (cur >= end) {
          break;
        }
        f(cur, std::min<IntegerT>(static_cast<IntegerT>(cur + chunk), end));
      }
    };

    using Function = decltype(worker);

    for (ssize_t i = 0; i < numToLaunch; ++i) {
      taskSet.schedule(Function(worker), ForceQueuingTag());
    }
  }
}