SemiFuture unorderedReduceSemiFuture()

in folly/futures/Future-inl.h [1850:1946]


SemiFuture<T> unorderedReduceSemiFuture(It first, It last, T initial, F func) {
  using ItF = typename std::iterator_traits<It>::value_type;
  using ItT = typename ItF::value_type;
  using Arg = MaybeTryArg<F, T, ItT>;

  if (first == last) {
    return makeFuture(std::move(initial));
  }

  typedef isTry<Arg> IsTry;

  struct Context {
    Context(T&& memo, F&& fn, size_t n)
        : lock_(),
          memo_(makeFuture<T>(std::move(memo))),
          func_(std::move(fn)),
          numThens_(0),
          numFutures_(n),
          promise_() {}

    folly::MicroSpinLock lock_; // protects memo_ and numThens_
    Future<T> memo_;
    F func_;
    size_t numThens_; // how many Futures completed and called .then()
    size_t numFutures_; // how many Futures in total
    Promise<T> promise_;
  };

  struct Fulfill {
    void operator()(Promise<T>&& p, T&& v) const { p.setValue(std::move(v)); }
    void operator()(Promise<T>&& p, Future<T>&& f) const {
      f.setCallback_(
          [p = std::move(p)](Executor::KeepAlive<>&&, Try<T>&& t) mutable {
            p.setTry(std::move(t));
          });
    }
  };

  std::vector<futures::detail::DeferredWrapper> executors;
  futures::detail::stealDeferredExecutors(executors, first, last);

  auto ctx = std::make_shared<Context>(
      std::move(initial), std::move(func), std::distance(first, last));
  for (size_t i = 0; first != last; ++first, ++i) {
    first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try<ItT>&& t) {
      (void)i;
      // Futures can be completed in any order, simultaneously.
      // To make this non-blocking, we create a new Future chain in
      // the order of completion to reduce the values.
      // The spinlock just protects chaining a new Future, not actually
      // executing the reduce, which should be really fast.
      Promise<T> p;
      auto f = p.getFuture();
      {
        folly::MSLGuard lock(ctx->lock_);
        f = std::exchange(ctx->memo_, std::move(f));
        if (++ctx->numThens_ == ctx->numFutures_) {
          // After reducing the value of the last Future, fulfill the Promise
          ctx->memo_.setCallback_([ctx](Executor::KeepAlive<>&&, Try<T>&& t2) {
            ctx->promise_.setValue(std::move(t2));
          });
        }
      }
      f.setCallback_([ctx, mp = std::move(p), mt = std::move(t)](
                         Executor::KeepAlive<>&&, Try<T>&& v) mutable {
        if (v.hasValue()) {
          exception_wrapper ew;
          try {
            Fulfill{}(
                std::move(mp),
                ctx->func_(
                    std::move(v.value()),
                    mt.template get<IsTry::value, Arg&&>()));
          } catch (...) {
            ew = exception_wrapper{std::current_exception()};
          }
          if (ew) {
            mp.setException(std::move(ew));
          }
        } else {
          mp.setTry(std::move(v));
        }
      });
    });
  }

  auto future = ctx->promise_.getSemiFuture();
  if (!executors.empty()) {
    future = std::move(future).defer(
        [](Try<typename decltype(future)::value_type>&& t) {
          return std::move(t).value();
        });
    const auto& deferredExecutor = futures::detail::getDeferredExecutor(future);
    deferredExecutor->setNestedExecutors(std::move(executors));
  }
  return future;
}