in flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java [158:178]
private void handlePartiallyFailedRequest(
List<Operation> requestEntries,
ResultHandler<Operation> resultHandler,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();
for (int i = 0; i < response.items().size(); i++) {
if (response.items().get(i).error() != null) {
failedItems.add(requestEntries.get(i));
}
}
numRecordsOutErrorsCounter.inc(failedItems.size());
numRecordsSendPartialFailureCounter.inc(failedItems.size());
LOG.info(
"The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
requestEntries.size(),
failedItems.size(),
response.took());
resultHandler.retryForEntries(failedItems);
}