auto collectAllWindowed()

in folly/experimental/coro/Collect-inl.h [682:823]


auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
    -> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  assert(maxConcurrency > 0);

  const folly::Executor::KeepAlive<> executor = co_await co_current_executor;

  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  exception_wrapper firstException;

  auto trySetFirstException = [&](exception_wrapper&& e) noexcept {
    if (!cancelSource.requestCancellation()) {
      // This is first entity to request cancellation.
      firstException = std::move(e);
    }
  };

  auto iter = access::begin(awaitables);
  const auto iterEnd = access::end(awaitables);

  using iterator_t = decltype(iter);
  using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;

  folly::coro::Mutex mutex;

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      tryResults;

  exception_wrapper iterationException;

  auto makeWorker = [&]() -> detail::BarrierTask {
    auto lock =
        co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

    while (!iterationException && iter != iterEnd) {
      const std::size_t thisIndex = tryResults.size();
      std::optional<awaitable_t> awaitable;
      try {
        tryResults.emplace_back();
        awaitable.emplace(*iter);
        ++iter;
      } catch (...) {
        iterationException = exception_wrapper{std::current_exception()};
        cancelSource.requestCancellation();
      }

      if (!awaitable) {
        co_return;
      }

      lock.unlock();

      detail::collect_all_try_range_component_t<
          detail::range_reference_t<InputRange>>
          tryResult;

      try {
        tryResult.emplace(co_await co_viaIfAsync(
            executor.get_alias(),
            co_withCancellation(
                cancelToken, static_cast<awaitable_t&&>(*awaitable))));
      } catch (...) {
        trySetFirstException(exception_wrapper{std::current_exception()});
      }

      lock =
          co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

      try {
        tryResults[thisIndex] = std::move(tryResult);
      } catch (...) {
        trySetFirstException(exception_wrapper{std::current_exception()});
      }
    }
  };

  std::vector<detail::BarrierTask> workerTasks;

  detail::Barrier barrier{1};

  exception_wrapper workerCreationException;

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  try {
    auto lock = co_await mutex.co_scoped_lock();

    while (!iterationException && iter != iterEnd &&
           workerTasks.size() < maxConcurrency) {
      // Unlock the mutex before starting the worker so that
      // it can consume as many results synchronously as it can before
      // returning here and letting us spawn another task.
      // This can avoid spawning more worker coroutines than is necessary
      // to consume all of the awaitables.
      lock.unlock();

      workerTasks.push_back(makeWorker());
      barrier.add(1);
      workerTasks.back().start(&barrier, asyncFrame);

      RequestContext::setContext(context);

      lock = co_await mutex.co_scoped_lock();
    }
  } catch (...) {
    // Only a fatal error if we failed to create any worker tasks.
    if (workerTasks.empty()) {
      // No need to synchronise here. There are no concurrent tasks running.
      iterationException = exception_wrapper{std::current_exception()};
    }
  }

  co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

  if (auto& ex = iterationException ? iterationException : firstException) {
    co_yield co_error(std::move(ex));
  }

  std::vector<detail::collect_all_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;
  results.reserve(tryResults.size());

  for (auto&& tryResult : tryResults) {
    assert(tryResult.hasValue());
    results.emplace_back(std::move(tryResult).value());
  }

  co_return results;
}