in flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java [525:574]
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
BulkItemResponse itemResponse;
Throwable failure;
RestStatus restStatus;
DocWriteRequest actionRequest;
try {
for (int i = 0; i < response.getItems().length; i++) {
itemResponse = response.getItems()[i];
failure = extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
restStatus = itemResponse.getFailure().getStatus();
actionRequest = request.requests().get(i);
if (restStatus == null) {
if (actionRequest instanceof ActionRequest) {
failureHandler.onFailure(
(ActionRequest) actionRequest,
failure,
-1,
failureRequestIndexer);
} else {
throw new UnsupportedOperationException(
"The sink currently only supports ActionRequests");
}
} else {
if (actionRequest instanceof ActionRequest) {
failureHandler.onFailure(
(ActionRequest) actionRequest,
failure,
restStatus.getStatus(),
failureRequestIndexer);
} else {
throw new UnsupportedOperationException(
"The sink currently only supports ActionRequests");
}
}
}
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, t);
}
}
if (flushOnCheckpoint) {
numPendingRequests.getAndAdd(-request.numberOfActions());
}
}