in flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java [118:140]
protected void submitRequestEntries(
List<Operation> requestEntries, ResultHandler<Operation> resultHandler) {
numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
BulkRequest.Builder br = new BulkRequest.Builder();
for (Operation operation : requestEntries) {
br.operations(new BulkOperation(operation.getBulkOperationVariant()));
}
esClient.bulk(br.build())
.whenComplete(
(response, error) -> {
if (error != null) {
handleFailedRequest(requestEntries, resultHandler, error);
} else if (response.errors()) {
handlePartiallyFailedRequest(
requestEntries, resultHandler, response);
} else {
handleSuccessfulRequest(resultHandler, response);
}
});
}