in core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala [187:245]
private def migrateToTagViewsInternal(
src: Source[PersistenceId, NotUsed],
periodicFlush: Int,
flushTimeout: Timeout): Future[Done] = {
log.info("Beginning migration of data to tag_views table in keyspace {}", journalSettings.keyspace)
implicit val timeout: Timeout = flushTimeout
val allPids = src
.map { pids =>
log.info("Migrating the following persistence ids {}", pids)
pids
}
.flatMapConcat(pid => {
val prereqs: Future[(Map[Tag, TagProgress], SequenceNr)] = {
val startingSeqFut = tagRecovery.tagScanningStartingSequenceNr(pid)
for {
tp <- tagRecovery.lookupTagProgress(pid)
_ <- tagRecovery.setTagProgress(pid, tp)
startingSeq <- startingSeqFut
} yield (tp, startingSeq)
}
val flushBatchSize = periodicFlushBatchSize(periodicFlush)
// would be nice to group these up into a TagWrites message but also
// nice that this reuses the recovery code :-/
Source.futureSource {
prereqs.map {
case (tp, startingSeq) => {
log.info(
"Starting migration for pid: {} based on progress: {} starting at sequence nr: {}",
pid,
tp,
startingSeq)
queries
.eventsByPersistenceId[RawEvent](
pid,
startingSeq,
Long.MaxValue,
Long.MaxValue,
None,
settings.querySettings.readProfile,
s"migrateToTag-$pid",
extractor =
EventsByTagMigration.rawPayloadOldTagSchemaExtractor(eventsByTagSettings.bucketSize, system))
.map(tagRecovery.sendMissingTagWriteRaw(tp, actorRunning = false))
.grouped(flushBatchSize)
.mapAsync(1)(_ => tagRecovery.flush(timeout))
}
}
}
})
for {
_ <- allPids.runWith(Sink.ignore)
_ <- (tagWriters ? FlushAllTagWriters(timeout)).mapTo[AllFlushed.type]
} yield Done
}