private def asyncFindHighestSequenceNr()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [673:702]


  private def asyncFindHighestSequenceNr(
      persistenceId: String,
      fromSequenceNr: Long,
      partitionSize: Long): Future[Long] = {
    def find(currentPnr: Long, currentSnr: Long, foundEmptyPartition: Boolean): Future[Long] = {
      // if every message has been deleted and thus no sequence_nr the driver gives us back 0 for "null" :(
      val boundSelectHighestSequenceNr = preparedSelectHighestSequenceNr.futureResult().map(ps => {
        val bound = ps.bind(persistenceId, currentPnr: JLong)
        bound

      })
      boundSelectHighestSequenceNr
        .flatMap(selectOne)
        .map { rowOption =>
          rowOption.map(_.getLong("sequence_nr"))
        }
        .flatMap {
          case None | Some(0) =>
            // never been to this partition, query one more partition because AtomicWrite can span (skip)
            // one entire partition
            // Some(0) when old schema with static used column, everything deleted in this partition
            if (foundEmptyPartition) Future.successful(currentSnr)
            else find(currentPnr + 1, currentSnr, foundEmptyPartition = true)
          case Some(nextHighest) =>
            find(currentPnr + 1, nextHighest, foundEmptyPartition = false)
        }
    }

    find(partitionNr(fromSequenceNr, partitionSize), fromSequenceNr, foundEmptyPartition = false)
  }