private def migrateToTagViewsInternal()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala [187:245]


  private def migrateToTagViewsInternal(
      src: Source[PersistenceId, NotUsed],
      periodicFlush: Int,
      flushTimeout: Timeout): Future[Done] = {
    log.info("Beginning migration of data to tag_views table in keyspace {}", journalSettings.keyspace)

    implicit val timeout: Timeout = flushTimeout

    val allPids = src
      .map { pids =>
        log.info("Migrating the following persistence ids {}", pids)
        pids
      }
      .flatMapConcat(pid => {
        val prereqs: Future[(Map[Tag, TagProgress], SequenceNr)] = {
          val startingSeqFut = tagRecovery.tagScanningStartingSequenceNr(pid)
          for {
            tp <- tagRecovery.lookupTagProgress(pid)
            _ <- tagRecovery.setTagProgress(pid, tp)
            startingSeq <- startingSeqFut
          } yield (tp, startingSeq)
        }

        val flushBatchSize = periodicFlushBatchSize(periodicFlush)

        // would be nice to group these up into a TagWrites message but also
        // nice that this reuses the recovery code :-/
        Source.futureSource {
          prereqs.map {
            case (tp, startingSeq) => {
              log.info(
                "Starting migration for pid: {} based on progress: {} starting at sequence nr: {}",
                pid,
                tp,
                startingSeq)
              queries
                .eventsByPersistenceId[RawEvent](
                  pid,
                  startingSeq,
                  Long.MaxValue,
                  Long.MaxValue,
                  None,
                  settings.querySettings.readProfile,
                  s"migrateToTag-$pid",
                  extractor =
                    EventsByTagMigration.rawPayloadOldTagSchemaExtractor(eventsByTagSettings.bucketSize, system))
                .map(tagRecovery.sendMissingTagWriteRaw(tp, actorRunning = false))
                .grouped(flushBatchSize)
                .mapAsync(1)(_ => tagRecovery.flush(timeout))
            }
          }
        }
      })

    for {
      _ <- allPids.runWith(Sink.ignore)
      _ <- (tagWriters ? FlushAllTagWriters(timeout)).mapTo[AllFlushed.type]
    } yield Done
  }