in src/Elastic.Clients.Elasticsearch/_Shared/Helpers/BulkAllObservable.cs [220:257]
private async Task<BulkAllResponse> HandleBulkRequestAsync(IList<T> buffer, long page, int backOffRetries, BulkResponse response)
{
var clientException = response.ApiCallDetails.OriginalException as TransportException;
var failureReason = clientException?.FailureReason;
var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest);
switch (failureReason)
{
case PipelineFailure.MaxRetriesReached:
if (response.ApiCallDetails.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes)
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes");
ThrowOnExhaustedRetries();
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
case PipelineFailure.CouldNotStartSniffOnStartup:
case PipelineFailure.BadAuthentication:
case PipelineFailure.NoNodesAttempted:
case PipelineFailure.SniffFailure:
case PipelineFailure.Unexpected:
throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk");
case PipelineFailure.BadResponse:
case PipelineFailure.PingFailure:
case PipelineFailure.MaxTimeoutReached:
case PipelineFailure.BadRequest:
default:
ThrowOnExhaustedRetries();
return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false);
}
void ThrowOnExhaustedRetries()
{
if (backOffRetries < _backOffRetries)
return;
throw ThrowOnBadBulk(response,
$"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})");
}
}