in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [775:815]
private def sendPreSnapshotTagWrites(
minProgressNr: Long,
fromSequenceNr: Long,
pid: String,
max: Long,
tp: Map[Tag, TagProgress],
tr: CassandraTagRecovery): Future[Done] = {
if (minProgressNr < fromSequenceNr) {
val scanTo = fromSequenceNr - 1
log.debug(
"[{}], Scanning events before snapshot to recover tag_views: From: [{}] to: [{}]",
pid,
minProgressNr,
scanTo)
queries
.eventsByPersistenceId(
pid,
minProgressNr,
scanTo,
max,
None,
settings.journalSettings.readProfile,
"asyncReplayMessagesPreSnapshot",
Extractors.optionalTaggedPersistentRepr(eventDeserializer, serialization))
.mapAsync(1) { t =>
t.tagged match {
case OptionVal.Some(tpr) =>
tr.sendMissingTagWrite(tp)(tpr)
case OptionVal.None => FutureDone // no tags, skip
}
}
.runWith(Sink.ignore)
} else {
log.debug(
"[{}] Recovery is starting before the latest tag writes tag progress. Min progress [{}]. From sequence nr of recovery: [{}]",
pid,
minProgressNr,
fromSequenceNr)
FutureDone
}
}