private async Task HandleBulkRequestAsync()

in src/Elastic.Clients.Elasticsearch/_Shared/Helpers/BulkAllObservable.cs [220:257]


	private async Task<BulkAllResponse> HandleBulkRequestAsync(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
	{
		var clientException = response.ApiCallDetails.OriginalException as TransportException;
		var failureReason = clientException?.FailureReason;
		var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest);
		switch (failureReason)
		{
			case PipelineFailure.MaxRetriesReached:
				if (response.ApiCallDetails.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes)
					throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes");

				ThrowOnExhaustedRetries();
				return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);

			case PipelineFailure.CouldNotStartSniffOnStartup:
			case PipelineFailure.BadAuthentication:
			case PipelineFailure.NoNodesAttempted:
			case PipelineFailure.SniffFailure:
			case PipelineFailure.Unexpected:
				throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk");
			case PipelineFailure.BadResponse:
			case PipelineFailure.PingFailure:
			case PipelineFailure.MaxTimeoutReached:
			case PipelineFailure.BadRequest:
			default:
				ThrowOnExhaustedRetries();
				return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
		}

		void ThrowOnExhaustedRetries()
		{
			if (backOffRetries < _backOffRetries)
				return;

			throw ThrowOnBadBulk(response,
				$"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})");
		}
	}