in core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala [148:172]
def deleteBeforeSnapshot(
persistenceId: String,
snapshotsToKeep: Int,
keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = {
require(snapshotsToKeep >= 1, "must keep at least one snapshot")
require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0")
selectAllSnapshotMetaPs.futureResult()
.flatMap { ps =>
val allRows: Source[Row, NotUsed] = session.select(ps.bind(persistenceId))
allRows.zipWithIndex
.takeWhile {
case (row, index) =>
if (row.getLong("timestamp") > keepAfterUnixTimestamp) {
true
} else if (index < snapshotsToKeep) {
true
} else {
false
}
}
.map(_._1)
.runWith(Sink.seq)
}
.flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows))
}