private async Task ExportBufferAsync()

in src/Elastic.Channels/BufferedChannelBase.cs [325:372]


	private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer<TEvent> buffer)
	{
		Interlocked.Increment(ref _ongoingExportOperations);
		using var outboundBuffer = buffer;
		var maxRetries = Options.BufferOptions.ExportMaxRetries;
		for (var i = 0; i <= maxRetries && items.Count > 0; i++)
		{
			if (TokenSource.Token.IsCancellationRequested) break;
			if (_signal is { IsSet: true }) break;

			_callbacks.ExportItemsAttemptCallback?.Invoke(i, items.Count);
			TResponse? response = null;

			// delay if we still have items and we are not at the end of the max retry cycle
			var atEndOfRetries = i == maxRetries;
			try
			{
				response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false);
				_callbacks.ExportResponseCallback?.Invoke(response,
					new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite });
			}
			catch (Exception e)
			{
				_callbacks.ExportExceptionCallback?.Invoke(e);
				if (atEndOfRetries)
					break;
			}

			items = response == null
				? EmptyArraySegments<TEvent>.Empty
				: RetryBuffer(response, items, outboundBuffer);
			if (items.Count > 0 && i == 0)
				_callbacks.ExportRetryableCountCallback?.Invoke(items.Count);

			if (items.Count > 0 && !atEndOfRetries)
			{
				await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
				_callbacks.ExportRetryCallback?.Invoke(items);
			}
			// otherwise if retryable items still exist and the user wants to be notified
			else if (items.Count > 0 && atEndOfRetries)
				_callbacks.ExportMaxRetriesCallback?.Invoke(items);
		}
		Interlocked.Decrement(ref _ongoingExportOperations);
		_callbacks.ExportBufferCallback?.Invoke();
		if (_signal is { IsSet: false })
			_signal.Signal();
	}