in java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java [298:428]
public void flush() {
List<RetryableBulkOperation<Context>> sentRequests = new ArrayList<>();
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
() -> {
// May happen on manual and periodic flushes
return !operations.isEmpty() && operations.stream()
.anyMatch(RetryableBulkOperation::isSendable);
},
() -> {
// Selecting operations that can be sent immediately,
// Dividing actual operations from contexts
List<BulkOperation> immediateOps = new ArrayList<>();
List<Context> contexts = new ArrayList<>();
for(Iterator<RetryableBulkOperation<Context>> it = operations.iterator(); it.hasNext();){
RetryableBulkOperation<Context> op = it.next();
if (op.isSendable()) {
immediateOps.add(op.operation());
contexts.add(op.context());
sentRequests.add(op);
it.remove();
}
}
// Build the request
BulkRequest request = newRequest().operations(immediateOps).build();
// Prepare for next round
currentSize = operations.size();
addCondition.signalIfReady();
long id = sendRequestCondition.invocations();
if (listener != null) {
// synchronous execution to make sure it actually runs before
listener.beforeBulk(id, request, contexts);
}
CompletionStage<BulkResponse> result = client.bulk(request);
requestsInFlightCount++;
if (listener == null) {
// No need to keep the request around, it can be GC'ed
request = null;
}
return new RequestExecution<>(id, request, contexts, result);
});
if (exec != null) {
// A request was actually sent
exec.futureResponse.handle((resp, thr) -> {
if (resp != null) {
// Success? Checking if total or partial
List<BulkResponseItem> failedRequestsCanRetry = resp.items().stream()
.filter(i -> i.error() != null && i.status() == 429)
.collect(Collectors.toList());
if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) {
// Total success! ...or there's no retry policy implemented. Either way, can call
listenerAfterBulkSuccess(resp, exec);
} else {
// Partial success, retrying failed requests if policy allows it
// Keeping list of retryable requests/responses, to exclude them for calling
// listener later
List<RetryableBulkOperation<Context>> retryableReq = new ArrayList<>();
List<RetryableBulkOperation<Context>> refires = new ArrayList<>();
List<BulkResponseItem> retryableResp = new ArrayList<>();
for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
int index = resp.items().indexOf(bulkItemResponse);
selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, retryableReq, refires);
}
// Scheduling flushes for just sent out retryable requests
if (!refires.isEmpty()) {
scheduleRetries(refires);
}
// Retrieving list of remaining successful or not retryable requests
retryableReq.forEach(sentRequests::remove);
if (!sentRequests.isEmpty()) {
if (listener != null) {
// Creating partial BulkRequest
List<BulkOperation> partialOps = new ArrayList<>();
List<Context> partialCtx = new ArrayList<>();
for (RetryableBulkOperation<Context> op : sentRequests) {
partialOps.add(op.operation());
partialCtx.add(op.context());
}
BulkRequest partialRequest = newRequest().operations(partialOps).build();
// Filtering response
List<BulkResponseItem> partialItems = resp.items()
.stream()
.filter(i -> !retryableResp.contains(i))
.collect(Collectors.toList());
BulkResponse partialResp = BulkResponse.of(br -> br
.items(partialItems)
.errors(resp.errors())
.took(resp.took())
.ingestTook(resp.ingestTook()));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, partialRequest, partialCtx, partialResp);
} finally {
if (listenerInProgressCount.decrementAndGet() == 0) {
closeCondition.signalIfReady();
}
}
});
}
}
}
} else {
// Failure
listenerAfterBulkException(thr, exec);
}
sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});
return null;
});
}
}