flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java [48:82]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            BulkProcessor bulkProcessor,
            boolean flushOnCheckpoint,
            AtomicLong numPendingRequestsRef) {
        this.bulkProcessor = checkNotNull(bulkProcessor);
        this.flushOnCheckpoint = flushOnCheckpoint;
        this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
    }

    @Override
    public void add(DeleteRequest... deleteRequests) {
        for (DeleteRequest deleteRequest : deleteRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(deleteRequest);
        }
    }

    @Override
    public void add(IndexRequest... indexRequests) {
        for (IndexRequest indexRequest : indexRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(indexRequest);
        }
    }

    @Override
    public void add(UpdateRequest... updateRequests) {
        for (UpdateRequest updateRequest : updateRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(updateRequest);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java [48:82]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            BulkProcessor bulkProcessor,
            boolean flushOnCheckpoint,
            AtomicLong numPendingRequestsRef) {
        this.bulkProcessor = checkNotNull(bulkProcessor);
        this.flushOnCheckpoint = flushOnCheckpoint;
        this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
    }

    @Override
    public void add(DeleteRequest... deleteRequests) {
        for (DeleteRequest deleteRequest : deleteRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(deleteRequest);
        }
    }

    @Override
    public void add(IndexRequest... indexRequests) {
        for (IndexRequest indexRequest : indexRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(indexRequest);
        }
    }

    @Override
    public void add(UpdateRequest... updateRequests) {
        for (UpdateRequest updateRequest : updateRequests) {
            if (flushOnCheckpoint) {
                numPendingRequestsRef.getAndIncrement();
            }
            this.bulkProcessor.add(updateRequest);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



