private async Task BulkAsync()

in src/Elastic.Clients.Elasticsearch/_Shared/Helpers/BulkAllObservable.cs [116:206]


	private async Task<BulkAllResponse> BulkAsync(IList<T> buffer, long page, int backOffRetries)
	{
		_compositeCancelToken.ThrowIfCancellationRequested();

		var request = _partitionedBulkRequest;

		var response = await _client.BulkAsync(s =>
		{
			s.RequestConfiguration(x => x.DisableAuditTrail(false));
			s.Index(request.Index);
			s.Timeout(request.Timeout);

			if (request.BufferToBulk is not null)
			{
				request.BufferToBulk(s, buffer);
			}
			else
			{
				s.IndexMany(buffer);
			}

			if (!string.IsNullOrEmpty(request.Pipeline))
				s.Pipeline(request.Pipeline);
			if (request.Routing != null)
				s.Routing(request.Routing);
			if (request.WaitForActiveShards.HasValue)
				s.WaitForActiveShards(request.WaitForActiveShards.ToString());

			switch (_partitionedBulkRequest)
			{
				case IHelperCallable helperCallable when helperCallable.ParentMetaData is not null:
					s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData));
					break;

				default:
					s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData()));
					break;
			}
		}, _compositeCancelToken).ConfigureAwait(false);

		_compositeCancelToken.ThrowIfCancellationRequested();
		_bulkResponseCallback?.Invoke(response);

		if (!response.ApiCallDetails.HasSuccessfulStatusCode || !response.ApiCallDetails.HasExpectedContentType)
			return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false);

		var retryableDocuments = new List<T>();
		var droppedDocuments = new List<Tuple<ResponseItem, T>>();

		var retryableDocsRemainingAfterRetriesExceeded = false;

		foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create))
		{
			if (documentWithResponse.Item1.IsValid)
				continue;

			if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2))
			{
				if (backOffRetries < _backOffRetries)
				{
					retryableDocuments.Add(documentWithResponse.Item2);
				}
				else
				{
					// We still have retriable documents but have exceeded all retries, so we mark these as
					// dropped so they get handled correctly.
					retryableDocsRemainingAfterRetriesExceeded = true;
					droppedDocuments.Add(documentWithResponse);
				}
			}
			else
			{
				droppedDocuments.Add(documentWithResponse);
			}
		}

		HandleDroppedDocuments(droppedDocuments, response);

		if (retryableDocsRemainingAfterRetriesExceeded)
		{
			throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times.");
		}
		else if (retryableDocuments.Count > 0)
		{
			return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);
		}

		request.BackPressure?.Release();

		return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items };
	}