in folly/experimental/coro/Collect-inl.h [826:948]
auto collectAllTryWindowed(InputRange awaitables, std::size_t maxConcurrency)
-> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>> {
assert(maxConcurrency > 0);
std::vector<detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>>
results;
exception_wrapper iterationException;
folly::coro::Mutex mutex;
const Executor::KeepAlive<> executor = co_await co_current_executor;
const CancellationToken& cancelToken = co_await co_current_cancellation_token;
auto iter = access::begin(awaitables);
const auto iterEnd = access::end(awaitables);
using iterator_t = decltype(iter);
using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;
using result_t = semi_await_result_t<awaitable_t>;
auto makeWorker = [&]() -> detail::BarrierTask {
auto lock =
co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());
while (!iterationException && iter != iterEnd) {
const std::size_t thisIndex = results.size();
std::optional<awaitable_t> awaitable;
try {
results.emplace_back();
awaitable.emplace(*iter);
++iter;
} catch (...) {
iterationException = exception_wrapper{std::current_exception()};
}
if (!awaitable) {
co_return;
}
lock.unlock();
detail::collect_all_try_range_component_t<
detail::range_reference_t<InputRange>>
result;
try {
if constexpr (std::is_void_v<result_t>) {
co_await co_viaIfAsync(
executor.get_alias(),
co_withCancellation(cancelToken, std::move(*awaitable)));
result.emplace();
} else {
result.emplace(co_await co_viaIfAsync(
executor.get_alias(),
co_withCancellation(cancelToken, std::move(*awaitable))));
}
} catch (...) {
result.emplaceException(std::current_exception());
}
lock =
co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());
try {
results[thisIndex] = std::move(result);
} catch (...) {
results[thisIndex].emplaceException(std::current_exception());
}
}
};
std::vector<detail::BarrierTask> workerTasks;
detail::Barrier barrier{1};
// Save the initial context and restore it after starting each task
// as the task may have modified the context before suspending and we
// want to make sure the next task is started with the same initial
// context.
const auto context = RequestContext::saveContext();
auto& asyncFrame = co_await detail::co_current_async_stack_frame;
try {
auto lock = co_await mutex.co_scoped_lock();
while (!iterationException && iter != iterEnd &&
workerTasks.size() < maxConcurrency) {
// Unlock the mutex before starting the child operation so that
// it can consume as many results synchronously as it can before
// returning here and letting us potentially spawn another task.
// This can avoid spawning more worker coroutines than is necessary
// to consume all of the awaitables.
lock.unlock();
workerTasks.push_back(makeWorker());
barrier.add(1);
workerTasks.back().start(&barrier, asyncFrame);
RequestContext::setContext(context);
lock = co_await mutex.co_scoped_lock();
}
} catch (...) {
// Failure to create a worker is an error if we failed
// to create _any_ workers. As long as we created one then
// the algorithm should still be able to make forward progress.
if (workerTasks.empty()) {
iterationException = exception_wrapper{std::current_exception()};
}
}
co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
if (iterationException) {
co_yield co_error(std::move(iterationException));
}
co_return results;
}