in core/src/main/scala/org/apache/pekko/persistence/cassandra/cleanup/Cleanup.scala [343:372]
private def foreach(
persistenceIds: immutable.Seq[String],
operationName: String,
pidOperation: String => Future[Done]): Future[Done] = {
val size = persistenceIds.size
log.info("Cleanup started {} of [{}] persistenceId.", operationName, size)
def loop(remaining: List[String], n: Int): Future[Done] = {
remaining match {
case Nil => Future.successful(Done)
case pid :: tail =>
pidOperation(pid).flatMap { _ =>
if (n % logProgressEvery == 0)
log.info("Cleanup {} [{}] of [{}].", operationName, n, size)
loop(tail, n + 1)
}
}
}
val result = loop(persistenceIds.toList, n = 1)
result.onComplete {
case Success(_) =>
log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size)
case Failure(e) =>
log.error(e, "Cleanup {} failed.", operationName)
}
result
}