in src/Elastic.Channels/BufferedChannelBase.cs [325:372]
private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer<TEvent> buffer)
{
Interlocked.Increment(ref _ongoingExportOperations);
using var outboundBuffer = buffer;
var maxRetries = Options.BufferOptions.ExportMaxRetries;
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
{
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;
_callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count);
TResponse? response = null;
// delay if we still have items and we are not at the end of the max retry cycle
var atEndOfRetries = i == maxRetries;
try
{
response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportResponseCallback?.Invoke(response,
new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite });
}
catch (Exception e)
{
_callbacks.ExportExceptionCallback?.Invoke(e);
if (atEndOfRetries)
break;
}
items = response == null
? EmptyArraySegments<TEvent>.Empty
: RetryBuffer(response, items, outboundBuffer);
if (items.Count > 0 && i == 0)
_callbacks.ExportRetryableCountCallback?.Invoke(items.Count);
if (items.Count > 0 && !atEndOfRetries)
{
await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportRetryCallback?.Invoke(items);
}
// otherwise if retryable items still exist and the user wants to be notified
else if (items.Count > 0 && atEndOfRetries)
_callbacks.ExportMaxRetriesCallback?.Invoke(items);
}
Interlocked.Decrement(ref _ongoingExportOperations);
_callbacks.ExportBufferCallback?.Invoke();
if (_signal is { IsSet: false })
_signal.Signal();
}