in src/Elastic.Channels/BufferedChannelBase.cs [289:323]
protected virtual ArraySegment<TEvent> RetryBuffer(TResponse response, ArraySegment<TEvent> currentBuffer, IWriteTrackingBuffer statistics) =>
EmptyArraySegments<TEvent>.Empty;
private async Task ConsumeOutboundEventsAsync()
{
_callbacks.OutboundChannelStartedCallback?.Invoke();
_taskList = new List<Task>(MaxConcurrency * 2);
while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (_waitForOutboundRead is { IsSet: false })
_waitForOutboundRead.Signal();
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;
while (OutChannel.Reader.TryRead(out var buffer))
{
var items = buffer.GetArraySegment();
await _throttleExportTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
_taskList.Add(t);
if (_taskList.Count >= MaxConcurrency)
{
var completedTask = await Task.WhenAny(_taskList).ConfigureAwait(false);
_taskList.Remove(completedTask);
}
_throttleExportTasks.Release();
}
}
await Task.WhenAll(_taskList).ConfigureAwait(false);
_exitCancelSource.Cancel();
_callbacks.OutboundChannelExitedCallback?.Invoke();
}