in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java [244:271]
private void extractFailures(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
pendingActions -= request.numberOfActions();
return;
}
Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);
chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
throw new FlinkRuntimeException(chainedFailures);
}