override def asyncReplayMessages()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [717:773]


  override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
      replayCallback: PersistentRepr => Unit): Future[Unit] = {
    log.debug("[{}] asyncReplayMessages from [{}] to [{}]", persistenceId, fromSequenceNr, toSequenceNr)

    tagRecovery match {
      case Some(tr) =>
        val recoveryPrep: Future[Map[String, TagProgress]] = {
          val scanningSeqNrFut = tr.tagScanningStartingSequenceNr(persistenceId)
          for {
            tp <- tr.lookupTagProgress(persistenceId)
            _ <- tr.setTagProgress(persistenceId, tp)
            scanningSeqNr <- scanningSeqNrFut
            _ <- sendPreSnapshotTagWrites(scanningSeqNr, fromSequenceNr, persistenceId, max, tp, tr)
          } yield tp
        }

        Source
          .futureSource(recoveryPrep.map((tp: Map[Tag, TagProgress]) => {
            log.debug(
              "[{}] starting recovery with tag progress: [{}]. From [{}] to [{}]",
              persistenceId,
              tp,
              fromSequenceNr,
              toSequenceNr)

            queries
              .eventsByPersistenceId(
                persistenceId,
                fromSequenceNr,
                toSequenceNr,
                max,
                None,
                settings.journalSettings.readProfile,
                "asyncReplayMessages",
                extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization))
              .mapAsync(1)(tr.sendMissingTagWrite(tp))
          }))
          .map(te => queries.mapEvent(te.pr))
          .runForeach(replayCallback)
          .map(_ => ())

      case None =>
        queries
          .eventsByPersistenceId(
            persistenceId,
            fromSequenceNr,
            toSequenceNr,
            max,
            None,
            settings.journalSettings.readProfile,
            "asyncReplayMessages",
            extractor = Extractors.persistentRepr(eventDeserializer, serialization))
          .map(queries.mapEvent)
          .runForeach(replayCallback)
          .map(_ => ())
    }
  }