in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/internal/source/reader/PubSubAckTracker.java [57:63]
public synchronized void notifyCheckpointComplete(long checkpointId) {
List<AckReplyConsumer> toAck = new ArrayList<>();
while (!checkpoints.isEmpty() && checkpoints.firstKey() <= checkpointId) {
toAck.addAll(checkpoints.remove(checkpoints.firstKey()));
}
toAck.forEach((ackReplyConsumer) -> ackReplyConsumer.ack());
}