in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java [71:96]
public void notifyCheckpointComplete(long checkpointId) {
// get all acknowledgeIds of this and earlier checkpoints
List<ACKID> idsToAcknowledge =
acknowledgeIdsPerCheckpoint.stream()
.filter(
acknowledgeIdsForCheckpoint ->
acknowledgeIdsForCheckpoint.getCheckpointId()
<= checkpointId)
.flatMap(
acknowledgeIdsForCheckpoint ->
acknowledgeIdsForCheckpoint.getAcknowledgeIds().stream())
.collect(toList());
acknowledger.acknowledge(idsToAcknowledge);
// only keep acknowledgeIds of newer checkpointIds
acknowledgeIdsPerCheckpoint =
acknowledgeIdsPerCheckpoint.stream()
.filter(
acknowledgeIdsForCheckpoint ->
acknowledgeIdsForCheckpoint.getCheckpointId()
> checkpointId)
.collect(toList());
outstandingAcknowledgements =
new AtomicInteger(numberOfAcknowledgementIds(acknowledgeIdsPerCheckpoint));
}