internal static async Task ForEachAsync()

in src/Elastic.Clients.Elasticsearch/_Shared/Core/Extensions/Extensions.cs [233:274]


	internal static async Task ForEachAsync<TSource, TResult>(
		this IEnumerable<TSource> lazyList,
		Func<TSource, long, Task<TResult>> taskSelector,
		Action<TSource, TResult> resultProcessor,
		Action<Exception> done,
		int maxDegreeOfParallelism,
		SemaphoreSlim additionalRateLimiter = null
	)
	{
		var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
		long page = 0;

		try
		{
			var tasks = new List<Task>(maxDegreeOfParallelism);
			foreach (var item in lazyList)
			{
				tasks.Add(ProcessAsync(item, taskSelector, resultProcessor, semaphore, additionalRateLimiter,
					page++));
				if (tasks.Count < maxDegreeOfParallelism)
					continue;

				var task = await Task.WhenAny(tasks).ConfigureAwait(false);
				if (task.Exception != null && task.IsFaulted && task.Exception.Flatten().InnerExceptions.First() is
					{ } e)
				{
					ExceptionDispatchInfo.Capture(e).Throw();
					return;
				}

				tasks.Remove(task);
			}

			await Task.WhenAll(tasks).ConfigureAwait(false);
			done(null);
		}
		catch (Exception e)
		{
			done(e);
			throw;
		}
	}