public void afterBulk()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java [430:479]


        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (response.hasFailures()) {
                BulkItemResponse itemResponse;
                Throwable failure;
                RestStatus restStatus;
                DocWriteRequest actionRequest;

                try {
                    for (int i = 0; i < response.getItems().length; i++) {
                        itemResponse = response.getItems()[i];
                        failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                        if (failure != null) {
                            restStatus = itemResponse.getFailure().getStatus();
                            actionRequest = request.requests().get(i);
                            if (restStatus == null) {
                                if (actionRequest instanceof ActionRequest) {
                                    failureHandler.onFailure(
                                            (ActionRequest) actionRequest,
                                            failure,
                                            -1,
                                            failureRequestIndexer);
                                } else {
                                    throw new UnsupportedOperationException(
                                            "The sink currently only supports ActionRequests");
                                }
                            } else {
                                if (actionRequest instanceof ActionRequest) {
                                    failureHandler.onFailure(
                                            (ActionRequest) actionRequest,
                                            failure,
                                            restStatus.getStatus(),
                                            failureRequestIndexer);
                                } else {
                                    throw new UnsupportedOperationException(
                                            "The sink currently only supports ActionRequests");
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    // fail the sink and skip the rest of the items
                    // if the failure handler decides to throw an exception
                    failureThrowable.compareAndSet(null, t);
                }
            }

            if (flushOnCheckpoint) {
                numPendingRequests.getAndAdd(-request.numberOfActions());
            }
        }