in folly/futures/Future-inl.h [1850:1946]
SemiFuture<T> unorderedReduceSemiFuture(It first, It last, T initial, F func) {
using ItF = typename std::iterator_traits<It>::value_type;
using ItT = typename ItF::value_type;
using Arg = MaybeTryArg<F, T, ItT>;
if (first == last) {
return makeFuture(std::move(initial));
}
typedef isTry<Arg> IsTry;
struct Context {
Context(T&& memo, F&& fn, size_t n)
: lock_(),
memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)),
numThens_(0),
numFutures_(n),
promise_() {}
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
F func_;
size_t numThens_; // how many Futures completed and called .then()
size_t numFutures_; // how many Futures in total
Promise<T> promise_;
};
struct Fulfill {
void operator()(Promise<T>&& p, T&& v) const { p.setValue(std::move(v)); }
void operator()(Promise<T>&& p, Future<T>&& f) const {
f.setCallback_(
[p = std::move(p)](Executor::KeepAlive<>&&, Try<T>&& t) mutable {
p.setTry(std::move(t));
});
}
};
std::vector<futures::detail::DeferredWrapper> executors;
futures::detail::stealDeferredExecutors(executors, first, last);
auto ctx = std::make_shared<Context>(
std::move(initial), std::move(func), std::distance(first, last));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Executor::KeepAlive<>&&, Try<ItT>&& t) {
(void)i;
// Futures can be completed in any order, simultaneously.
// To make this non-blocking, we create a new Future chain in
// the order of completion to reduce the values.
// The spinlock just protects chaining a new Future, not actually
// executing the reduce, which should be really fast.
Promise<T> p;
auto f = p.getFuture();
{
folly::MSLGuard lock(ctx->lock_);
f = std::exchange(ctx->memo_, std::move(f));
if (++ctx->numThens_ == ctx->numFutures_) {
// After reducing the value of the last Future, fulfill the Promise
ctx->memo_.setCallback_([ctx](Executor::KeepAlive<>&&, Try<T>&& t2) {
ctx->promise_.setValue(std::move(t2));
});
}
}
f.setCallback_([ctx, mp = std::move(p), mt = std::move(t)](
Executor::KeepAlive<>&&, Try<T>&& v) mutable {
if (v.hasValue()) {
exception_wrapper ew;
try {
Fulfill{}(
std::move(mp),
ctx->func_(
std::move(v.value()),
mt.template get<IsTry::value, Arg&&>()));
} catch (...) {
ew = exception_wrapper{std::current_exception()};
}
if (ew) {
mp.setException(std::move(ew));
}
} else {
mp.setTry(std::move(v));
}
});
});
}
auto future = ctx->promise_.getSemiFuture();
if (!executors.empty()) {
future = std::move(future).defer(
[](Try<typename decltype(future)::value_type>&& t) {
return std::move(t).value();
});
const auto& deferredExecutor = futures::detail::getDeferredExecutor(future);
deferredExecutor->setNestedExecutors(std::move(executors));
}
return future;
}