in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [443:487]
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
log.debug("[{}] asyncReadHighestSequenceNr [{}] [{}]", persistenceId, fromSequenceNr, sender())
val highestSequenceNr = writeInProgress.get(persistenceId) match {
case null =>
asyncReadHighestSequenceNrInternal(persistenceId, fromSequenceNr)
case f =>
f.flatMap(_ => asyncReadHighestSequenceNrInternal(persistenceId, fromSequenceNr))
}
val toReturn = tagRecovery match {
case Some(tr) =>
// This relies on asyncReadHighestSequenceNr having the correct sender()
// No other calls into the async journal have this as they are called from Future callbacks
val persistentActor = sender()
for {
seqNr <- highestSequenceNr
_ <- tagRecovery.get.sendPersistentActorStarting(persistenceId, persistentActor)
_ <- if (seqNr == fromSequenceNr && seqNr != 0) {
log.debug(
"[{}] snapshot is current so replay won't be required. Calculating tag progress now",
persistenceId)
val scanningSeqNrFut = tr.tagScanningStartingSequenceNr(persistenceId)
for {
tp <- tr.lookupTagProgress(persistenceId)
_ <- tr.setTagProgress(persistenceId, tp)
scanningSeqNr <- scanningSeqNrFut
_ <- sendPreSnapshotTagWrites(scanningSeqNr, fromSequenceNr, persistenceId, Long.MaxValue, tp, tr)
} yield seqNr
} else if (seqNr == 0) {
log.debug("[{}] New pid. Sending blank tag progress. [{}]", persistenceId, persistentActor)
tr.setTagProgress(persistenceId, Map.empty)
} else {
FutureUnit
}
} yield seqNr
case None =>
highestSequenceNr
}
toReturn.onComplete { highestSeq =>
log.debug("asyncReadHighestSequenceNr {} returning {}", persistenceId, highestSeq)
}
toReturn
}