in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [374:389]
private def eventsByTagPrereqs(tag: String, usingOffset: Boolean, fromOffset: UUID)
: Future[(EventByTagStatements, Map[Tag, (TagPidSequenceNr, UUID)], TagViewSequenceNumberScanner)] = {
val currentBucket =
TimeBucket(System.currentTimeMillis(), eventsByTagSettings.bucketSize)
val initialTagPidSequenceNrs =
if (usingOffset && currentBucket.within(fromOffset) && eventsByTagSettings.offsetScanning > Duration.Zero)
calculateStartingTagPidSequenceNrs(tag, fromOffset)
else
Future.successful(Map.empty[Tag, (TagPidSequenceNr, UUID)])
for {
statements <- combinedEventsByTagStmts
tagSequenceNrs <- initialTagPidSequenceNrs
tagViewScanner <- tagViewScanner
} yield (statements, tagSequenceNrs, tagViewScanner)
}