override def asyncReadHighestSequenceNr()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [448:492]


  override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
    log.debug("[{}] asyncReadHighestSequenceNr [{}] [{}]", persistenceId, fromSequenceNr, sender())
    val highestSequenceNr = writeInProgress.get(persistenceId) match {
      case null =>
        asyncReadHighestSequenceNrInternal(persistenceId, fromSequenceNr)
      case f =>
        f.flatMap(_ => asyncReadHighestSequenceNrInternal(persistenceId, fromSequenceNr))
    }

    val toReturn = tagRecovery match {
      case Some(tr) =>
        // This relies on asyncReadHighestSequenceNr having the correct sender()
        // No other calls into the async journal have this as they are called from Future callbacks
        val persistentActor = sender()
        for {
          seqNr <- highestSequenceNr
          _ <- tagRecovery.get.sendPersistentActorStarting(persistenceId, persistentActor)
          _ <- if (seqNr == fromSequenceNr && seqNr != 0) {
            log.debug(
              "[{}] snapshot is current so replay won't be required. Calculating tag progress now",
              persistenceId)
            val scanningSeqNrFut = tr.tagScanningStartingSequenceNr(persistenceId)
            for {
              tp <- tr.lookupTagProgress(persistenceId)
              _ <- tr.setTagProgress(persistenceId, tp)
              scanningSeqNr <- scanningSeqNrFut
              _ <- sendPreSnapshotTagWrites(scanningSeqNr, fromSequenceNr, persistenceId, Long.MaxValue, tp, tr)
            } yield seqNr
          } else if (seqNr == 0) {
            log.debug("[{}] New pid. Sending blank tag progress. [{}]", persistenceId, persistentActor)
            tr.setTagProgress(persistenceId, Map.empty)
          } else {
            FutureUnit
          }
        } yield seqNr
      case None =>
        highestSequenceNr
    }

    toReturn.onComplete { highestSeq =>
      log.debug("asyncReadHighestSequenceNr {} returning {}", persistenceId, highestSeq)
    }

    toReturn
  }