in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala [74:136]
private[pekko] def scan(
tag: String,
fromOffset: UUID,
toOffset: UUID,
bucketSize: BucketSize,
scanningPeriod: FiniteDuration,
whichToKeep: (TagPidSequenceNr,
TagPidSequenceNr) => TagPidSequenceNr): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
// How many buckets is this spread across?
val startBucket = TimeBucket(fromOffset, bucketSize)
val endBucket = TimeBucket(toOffset, bucketSize)
require(startBucket <= endBucket)
if (log.isDebugEnabled) {
log.debug(
s"Scanning tag: $tag from: {}, to: {}. Bucket {} to {}",
formatOffset(fromOffset),
formatOffset(toOffset),
startBucket,
endBucket)
}
Source
.unfold(startBucket)(current => {
if (current <= endBucket) {
Some((current.next(), current))
} else {
None
}
})
.flatMapConcat(bucket => {
log.debug("Scanning bucket {}", bucket)
session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset)
})
.map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUuid("timestamp")))
.toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
case (acc, (pid, tagPidSequenceNr, timestamp)) =>
val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match {
case None =>
(tagPidSequenceNr, timestamp)
case Some((currentTagPidSequenceNr, currentTimestamp)) =>
if (whichToKeep(tagPidSequenceNr, currentTagPidSequenceNr) == tagPidSequenceNr)
(tagPidSequenceNr, timestamp)
else
(currentTagPidSequenceNr, currentTimestamp)
}
acc + (pid -> ((newTagPidSequenceNr, newTimestamp)))
})(Keep.right)
.withAttributes(ActorAttributes.dispatcher(pluginDispatcher))
.run()
}
if (scanningPeriod > Duration.Zero) {
after(scanningPeriod)(doIt())(materializer.system)
} else {
doIt()
}
}