in core/src/main/scala/org/apache/pekko/persistence/cassandra/reconciler/BuildTagViewForPersistenceId.scala [58:84]
def reconcile(flushEvery: Int = 1000): Future[Done] = {
val recoveryPrep = for {
tp <- recovery.lookupTagProgress(persistenceId)
_ <- recovery.setTagProgress(persistenceId, tp)
} yield tp
Source
.futureSource(recoveryPrep.map((tp: Map[String, TagProgress]) => {
log.debug("[{}] Rebuilding tag view table from: [{}]", persistenceId, tp)
queries
.eventsByPersistenceId(
persistenceId,
0,
Long.MaxValue,
Long.MaxValue,
None,
settings.journalSettings.readProfile,
"BuildTagViewForPersistenceId",
extractor = Extractors.rawEvent(settings.eventsByTagSettings.bucketSize, serialization, system))
.map(recovery.sendMissingTagWriteRaw(tp, actorRunning = false))
.buffer(flushEvery, OverflowStrategy.backpressure)
.mapAsync(1)(_ => recovery.flush(flushTimeout))
}))
.runWith(Sink.ignore)
}