in core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala [208:244]
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
session.serverMetaData.flatMap { meta =>
if (meta.isVersion2
|| settings.cosmosDb
|| 0L < criteria.minTimestamp
|| criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
preparedSelectSnapshotMetadata.flatMap { snapshotMetaPs =>
// this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
// all previous tombstones are scanned in the meta data query
metadata(snapshotMetaPs, persistenceId, criteria, limit = None).flatMap {
mds: immutable.Seq[SnapshotMetadata] =>
val boundStatementBatches = mds
.map(md =>
preparedDeleteSnapshot.map(_.bind(md.persistenceId, md.sequenceNr: JLong)
.setExecutionProfileName(snapshotSettings.writeProfile)))
.grouped(0xFFFF - 1)
if (boundStatementBatches.nonEmpty) {
Future
.sequence(
boundStatementBatches.map(boundStatements =>
Future
.sequence(boundStatements)
.flatMap(stmts => executeBatch(batch => stmts.foreach(batch.addStatement)))))
.map(_ => ())
} else {
FutureUnit
}
}
}
} else {
val boundDeleteSnapshot = preparedDeleteAllSnapshotsForPidAndSequenceNrBetween.map(
_.bind(persistenceId, criteria.minSequenceNr: JLong, criteria.maxSequenceNr: JLong)
.setExecutionProfileName(snapshotSettings.writeProfile))
boundDeleteSnapshot.flatMap(session.executeWrite(_)).map(_ => ())
}
}
}