in flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java [262:289]
private void extractFailures(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
pendingActions -= request.numberOfActions();
return;
}
Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);
chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}