in core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala [88:127]
private def issueSnapshotDelete(
persistenceId: String,
maxToKeep: Long,
rows: Seq[Row]): Future[Option[SnapshotMetadata]] = {
log.debug("issueSnapshotDelete [{}] [{}] [{}]", persistenceId, maxToKeep, rows.size)
rows match {
case Nil =>
log.debug("persistence id [{}] has 0 snapshots, no deletes issued", persistenceId)
Future.successful(None)
case fewer if fewer.size < maxToKeep =>
// no delete required, return the oldest snapshot
log.debug("Fewer than snapshots than requested for persistence id [{}], no deletes issued", persistenceId)
Future.successful(
Some(SnapshotMetadata(persistenceId, fewer.last.getLong("sequence_nr"), fewer.last.getLong("timestamp"))))
case more =>
if (log.isDebugEnabled) {
log.debug(
"Latest {} snapshots for persistence id [{}] range from {} to {}",
maxToKeep,
persistenceId,
more.head.getLong("sequence_nr"),
more.last.getLong("sequence_nr"))
}
val result =
SnapshotMetadata(persistenceId, more.last.getLong("sequence_nr"), more.last.getLong("timestamp"))
if (dryRun) {
log.info(
"dry run: CQL: [{}] persistence_id: [{}] sequence_nr [{}]",
statements.deleteSnapshotsBefore,
persistenceId,
result.sequenceNr)
Future.successful(Some(result))
} else {
session
.executeWrite(statements.deleteSnapshotsBefore, persistenceId, result.sequenceNr: JLong)
.map(_ => Some(result))
}
}
}