in src/Elastic.Clients.Elasticsearch/_Shared/Helpers/BulkAllObservable.cs [116:206]
private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int backOffRetries)
{
_compositeCancelToken.ThrowIfCancellationRequested();
var request = _partitionedBulkRequest;
var response = await _client.BulkAsync(s =>
{
s.RequestConfiguration(x => x.DisableAuditTrail(false));
s.Index(request.Index);
s.Timeout(request.Timeout);
if (request.BufferToBulk is not null)
{
request.BufferToBulk(s, buffer);
}
else
{
s.IndexMany(buffer);
}
if (!string.IsNullOrEmpty(request.Pipeline))
s.Pipeline(request.Pipeline);
if (request.Routing != null)
s.Routing(request.Routing);
if (request.WaitForActiveShards.HasValue)
s.WaitForActiveShards(request.WaitForActiveShards.ToString());
switch (_partitionedBulkRequest)
{
case IHelperCallable helperCallable when helperCallable.ParentMetaData is not null:
s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData));
break;
default:
s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData()));
break;
}
}, _compositeCancelToken).ConfigureAwait(false);
_compositeCancelToken.ThrowIfCancellationRequested();
_bulkResponseCallback?.Invoke(response);
if (!response.ApiCallDetails.HasSuccessfulStatusCode || !response.ApiCallDetails.HasExpectedContentType)
return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false);
var retryableDocuments = new List<T>();
var droppedDocuments = new List<Tuple<ResponseItem, T>>();
var retryableDocsRemainingAfterRetriesExceeded = false;
foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create))
{
if (documentWithResponse.Item1.IsValid)
continue;
if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2))
{
if (backOffRetries < _backOffRetries)
{
retryableDocuments.Add(documentWithResponse.Item2);
}
else
{
// We still have retriable documents but have exceeded all retries, so we mark these as
// dropped so they get handled correctly.
retryableDocsRemainingAfterRetriesExceeded = true;
droppedDocuments.Add(documentWithResponse);
}
}
else
{
droppedDocuments.Add(documentWithResponse);
}
}
HandleDroppedDocuments(droppedDocuments, response);
if (retryableDocsRemainingAfterRetriesExceeded)
{
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times.");
}
else if (retryableDocuments.Count > 0)
{
return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
}
request.BackPressure?.Release();
return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items };
}