in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [673:702]
private def asyncFindHighestSequenceNr(
persistenceId: String,
fromSequenceNr: Long,
partitionSize: Long): Future[Long] = {
def find(currentPnr: Long, currentSnr: Long, foundEmptyPartition: Boolean): Future[Long] = {
// if every message has been deleted and thus no sequence_nr the driver gives us back 0 for "null" :(
val boundSelectHighestSequenceNr = preparedSelectHighestSequenceNr.futureResult().map(ps => {
val bound = ps.bind(persistenceId, currentPnr: JLong)
bound
})
boundSelectHighestSequenceNr
.flatMap(selectOne)
.map { rowOption =>
rowOption.map(_.getLong("sequence_nr"))
}
.flatMap {
case None | Some(0) =>
// never been to this partition, query one more partition because AtomicWrite can span (skip)
// one entire partition
// Some(0) when old schema with static used column, everything deleted in this partition
if (foundEmptyPartition) Future.successful(currentSnr)
else find(currentPnr + 1, currentSnr, foundEmptyPartition = true)
case Some(nextHighest) =>
find(currentPnr + 1, nextHighest, foundEmptyPartition = false)
}
}
find(partitionNr(fromSequenceNr, partitionSize), fromSequenceNr, foundEmptyPartition = false)
}