in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [717:773]
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
replayCallback: PersistentRepr => Unit): Future[Unit] = {
log.debug("[{}] asyncReplayMessages from [{}] to [{}]", persistenceId, fromSequenceNr, toSequenceNr)
tagRecovery match {
case Some(tr) =>
val recoveryPrep: Future[Map[String, TagProgress]] = {
val scanningSeqNrFut = tr.tagScanningStartingSequenceNr(persistenceId)
for {
tp <- tr.lookupTagProgress(persistenceId)
_ <- tr.setTagProgress(persistenceId, tp)
scanningSeqNr <- scanningSeqNrFut
_ <- sendPreSnapshotTagWrites(scanningSeqNr, fromSequenceNr, persistenceId, max, tp, tr)
} yield tp
}
Source
.futureSource(recoveryPrep.map((tp: Map[Tag, TagProgress]) => {
log.debug(
"[{}] starting recovery with tag progress: [{}]. From [{}] to [{}]",
persistenceId,
tp,
fromSequenceNr,
toSequenceNr)
queries
.eventsByPersistenceId(
persistenceId,
fromSequenceNr,
toSequenceNr,
max,
None,
settings.journalSettings.readProfile,
"asyncReplayMessages",
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization))
.mapAsync(1)(tr.sendMissingTagWrite(tp))
}))
.map(te => queries.mapEvent(te.pr))
.runForeach(replayCallback)
.map(_ => ())
case None =>
queries
.eventsByPersistenceId(
persistenceId,
fromSequenceNr,
toSequenceNr,
max,
None,
settings.journalSettings.readProfile,
"asyncReplayMessages",
extractor = Extractors.persistentRepr(eventDeserializer, serialization))
.map(queries.mapEvent)
.runForeach(replayCallback)
.map(_ => ())
}
}