public void notifyCheckpointComplete()

in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java [71:96]


    public void notifyCheckpointComplete(long checkpointId) {
        // get all acknowledgeIds of this and earlier checkpoints
        List<ACKID> idsToAcknowledge =
                acknowledgeIdsPerCheckpoint.stream()
                        .filter(
                                acknowledgeIdsForCheckpoint ->
                                        acknowledgeIdsForCheckpoint.getCheckpointId()
                                                <= checkpointId)
                        .flatMap(
                                acknowledgeIdsForCheckpoint ->
                                        acknowledgeIdsForCheckpoint.getAcknowledgeIds().stream())
                        .collect(toList());

        acknowledger.acknowledge(idsToAcknowledge);

        // only keep acknowledgeIds of newer checkpointIds
        acknowledgeIdsPerCheckpoint =
                acknowledgeIdsPerCheckpoint.stream()
                        .filter(
                                acknowledgeIdsForCheckpoint ->
                                        acknowledgeIdsForCheckpoint.getCheckpointId()
                                                > checkpointId)
                        .collect(toList());
        outstandingAcknowledgements =
                new AtomicInteger(numberOfAcknowledgementIds(acknowledgeIdsPerCheckpoint));
    }