auto collectAllTryWindowed()

in folly/experimental/coro/Collect-inl.h [826:948]


auto collectAllTryWindowed(InputRange awaitables, std::size_t maxConcurrency)
    -> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  assert(maxConcurrency > 0);

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;

  exception_wrapper iterationException;

  folly::coro::Mutex mutex;

  const Executor::KeepAlive<> executor = co_await co_current_executor;
  const CancellationToken& cancelToken = co_await co_current_cancellation_token;

  auto iter = access::begin(awaitables);
  const auto iterEnd = access::end(awaitables);

  using iterator_t = decltype(iter);
  using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;
  using result_t = semi_await_result_t<awaitable_t>;

  auto makeWorker = [&]() -> detail::BarrierTask {
    auto lock =
        co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

    while (!iterationException && iter != iterEnd) {
      const std::size_t thisIndex = results.size();
      std::optional<awaitable_t> awaitable;

      try {
        results.emplace_back();
        awaitable.emplace(*iter);
        ++iter;
      } catch (...) {
        iterationException = exception_wrapper{std::current_exception()};
      }

      if (!awaitable) {
        co_return;
      }

      lock.unlock();

      detail::collect_all_try_range_component_t<
          detail::range_reference_t<InputRange>>
          result;

      try {
        if constexpr (std::is_void_v<result_t>) {
          co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(cancelToken, std::move(*awaitable)));
          result.emplace();
        } else {
          result.emplace(co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(cancelToken, std::move(*awaitable))));
        }
      } catch (...) {
        result.emplaceException(std::current_exception());
      }

      lock =
          co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

      try {
        results[thisIndex] = std::move(result);
      } catch (...) {
        results[thisIndex].emplaceException(std::current_exception());
      }
    }
  };

  std::vector<detail::BarrierTask> workerTasks;

  detail::Barrier barrier{1};

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  try {
    auto lock = co_await mutex.co_scoped_lock();
    while (!iterationException && iter != iterEnd &&
           workerTasks.size() < maxConcurrency) {
      // Unlock the mutex before starting the child operation so that
      // it can consume as many results synchronously as it can before
      // returning here and letting us potentially spawn another task.
      // This can avoid spawning more worker coroutines than is necessary
      // to consume all of the awaitables.
      lock.unlock();

      workerTasks.push_back(makeWorker());
      barrier.add(1);
      workerTasks.back().start(&barrier, asyncFrame);

      RequestContext::setContext(context);

      lock = co_await mutex.co_scoped_lock();
    }
  } catch (...) {
    // Failure to create a worker is an error if we failed
    // to create _any_ workers. As long as we created one then
    // the algorithm should still be able to make forward progress.
    if (workerTasks.empty()) {
      iterationException = exception_wrapper{std::current_exception()};
    }
  }

  co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

  if (iterationException) {
    co_yield co_error(std::move(iterationException));
  }

  co_return results;
}