in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [215:228]
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
replayCallback: (PersistentRepr) => Unit): Future[Unit] = {
logFailure(s"replay for $persistenceId ($fromSequenceNr to $toSequenceNr)") {
log.debug("starting replay for {} from {} to {} (max {})", persistenceId, fromSequenceNr, toSequenceNr, max)
eventsStream(
persistenceId = persistenceId,
fromSequenceNr = fromSequenceNr,
toSequenceNr = toSequenceNr,
max = max)
.runFold(0) { (count, next) => replayCallback(next); count + 1 }
.map(count => log.debug("replay finished for {} with {} events", persistenceId, count))
}
}