in folly/experimental/coro/Collect-inl.h [682:823]
auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
-> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>> {
assert(maxConcurrency > 0);
const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
const CancellationToken& parentCancelToken =
co_await co_current_cancellation_token;
const CancellationSource cancelSource;
const CancellationToken cancelToken =
CancellationToken::merge(parentCancelToken, cancelSource.getToken());
exception_wrapper firstException;
auto trySetFirstException = [&](exception_wrapper&& e) noexcept {
if (!cancelSource.requestCancellation()) {
// This is first entity to request cancellation.
firstException = std::move(e);
}
};
auto iter = access::begin(awaitables);
const auto iterEnd = access::end(awaitables);
using iterator_t = decltype(iter);
using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;
folly::coro::Mutex mutex;
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
tryResults;
exception_wrapper iterationException;
auto makeWorker = [&]() -> detail::BarrierTask {
auto lock =
co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());
while (!iterationException && iter != iterEnd) {
const std::size_t thisIndex = tryResults.size();
std::optional<awaitable_t> awaitable;
try {
tryResults.emplace_back();
awaitable.emplace(*iter);
++iter;
} catch (...) {
iterationException = exception_wrapper{std::current_exception()};
cancelSource.requestCancellation();
}
if (!awaitable) {
co_return;
}
lock.unlock();
detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>
tryResult;
try {
tryResult.emplace(co_await co_viaIfAsync(
executor.get_alias(),
co_withCancellation(
cancelToken, static_cast<awaitable_t&&>(*awaitable))));
} catch (...) {
trySetFirstException(exception_wrapper{std::current_exception()});
}
lock =
co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());
try {
tryResults[thisIndex] = std::move(tryResult);
} catch (...) {
trySetFirstException(exception_wrapper{std::current_exception()});
}
}
};
std::vector<detail::BarrierTask> workerTasks;
detail::Barrier barrier{1};
exception_wrapper workerCreationException;
// Save the initial context and restore it after starting each task
// as the task may have modified the context before suspending and we
// want to make sure the next task is started with the same initial
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
try {
auto lock = co_await mutex.co_scoped_lock();
while (!iterationException && iter != iterEnd &&
workerTasks.size() < maxConcurrency) {
// Unlock the mutex before starting the worker so that
// it can consume as many results synchronously as it can before
// returning here and letting us spawn another task.
// This can avoid spawning more worker coroutines than is necessary
// to consume all of the awaitables.
lock.unlock();
workerTasks.push_back(makeWorker());
barrier.add(1);
workerTasks.back().start(&barrier, asyncFrame);
RequestContext::setContext(context);
lock = co_await mutex.co_scoped_lock();
}
} catch (...) {
// Only a fatal error if we failed to create any worker tasks.
if (workerTasks.empty()) {
// No need to synchronise here. There are no concurrent tasks running.
iterationException = exception_wrapper{std::current_exception()};
}
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
if (auto& ex = iterationException ? iterationException : firstException) {
co_yield co_error(std::move(ex));
}
std::vector<detail::collect_all_range_component_t<
detail::range_reference_t<InputRange>>>
results;
results.reserve(tryResults.size());
for (auto&& tryResult : tryResults) {
assert(tryResult.hasValue());
results.emplace_back(std::move(tryResult).value());
}
co_return results;
}