in src/Elastic.Clients.Elasticsearch/_Shared/Core/Extensions/Extensions.cs [233:274]
internal static async Task ForEachAsync<TSource, TResult>(
this IEnumerable<TSource> lazyList,
Func<TSource, long, Task<TResult>> taskSelector,
Action<TSource, TResult> resultProcessor,
Action<Exception> done,
int maxDegreeOfParallelism,
SemaphoreSlim additionalRateLimiter = null
)
{
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
long page = 0;
try
{
var tasks = new List<Task>(maxDegreeOfParallelism);
foreach (var item in lazyList)
{
tasks.Add(ProcessAsync(item, taskSelector, resultProcessor, semaphore, additionalRateLimiter,
page++));
if (tasks.Count < maxDegreeOfParallelism)
continue;
var task = await Task.WhenAny(tasks).ConfigureAwait(false);
if (task.Exception != null && task.IsFaulted && task.Exception.Flatten().InnerExceptions.First() is
{ } e)
{
ExceptionDispatchInfo.Capture(e).Throw();
return;
}
tasks.Remove(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
done(null);
}
catch (Exception e)
{
done(e);
throw;
}
}