private def sendPreSnapshotTagWrites()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [775:815]


  private def sendPreSnapshotTagWrites(
      minProgressNr: Long,
      fromSequenceNr: Long,
      pid: String,
      max: Long,
      tp: Map[Tag, TagProgress],
      tr: CassandraTagRecovery): Future[Done] = {
    if (minProgressNr < fromSequenceNr) {
      val scanTo = fromSequenceNr - 1
      log.debug(
        "[{}], Scanning events before snapshot to recover tag_views: From: [{}] to: [{}]",
        pid,
        minProgressNr,
        scanTo)
      queries
        .eventsByPersistenceId(
          pid,
          minProgressNr,
          scanTo,
          max,
          None,
          settings.journalSettings.readProfile,
          "asyncReplayMessagesPreSnapshot",
          Extractors.optionalTaggedPersistentRepr(eventDeserializer, serialization))
        .mapAsync(1) { t =>
          t.tagged match {
            case OptionVal.Some(tpr) =>
              tr.sendMissingTagWrite(tp)(tpr)
            case OptionVal.None => FutureDone // no tags, skip
          }
        }
        .runWith(Sink.ignore)
    } else {
      log.debug(
        "[{}] Recovery is starting before the latest tag writes tag progress. Min progress [{}]. From sequence nr of recovery: [{}]",
        pid,
        minProgressNr,
        fromSequenceNr)
      FutureDone
    }
  }