def deleteBeforeSnapshot()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala [148:172]


  def deleteBeforeSnapshot(
      persistenceId: String,
      snapshotsToKeep: Int,
      keepAfterUnixTimestamp: Long): Future[Option[SnapshotMetadata]] = {
    require(snapshotsToKeep >= 1, "must keep at least one snapshot")
    require(keepAfterUnixTimestamp >= 0, "keepAfter must be greater than 0")
    selectAllSnapshotMetaPs.futureResult()
      .flatMap { ps =>
        val allRows: Source[Row, NotUsed] = session.select(ps.bind(persistenceId))
        allRows.zipWithIndex
          .takeWhile {
            case (row, index) =>
              if (row.getLong("timestamp") > keepAfterUnixTimestamp) {
                true
              } else if (index < snapshotsToKeep) {
                true
              } else {
                false
              }
          }
          .map(_._1)
          .runWith(Sink.seq)
      }
      .flatMap(rows => issueSnapshotDelete(persistenceId, snapshotsToKeep, rows))
  }