private[pekko] def scan()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/TagViewSequenceNumberScanner.scala [74:136]


  private[pekko] def scan(
      tag: String,
      fromOffset: UUID,
      toOffset: UUID,
      bucketSize: BucketSize,
      scanningPeriod: FiniteDuration,
      whichToKeep: (TagPidSequenceNr,
          TagPidSequenceNr) => TagPidSequenceNr): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {

    def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {

      // How many buckets is this spread across?
      val startBucket = TimeBucket(fromOffset, bucketSize)
      val endBucket = TimeBucket(toOffset, bucketSize)

      require(startBucket <= endBucket)

      if (log.isDebugEnabled) {
        log.debug(
          s"Scanning tag: $tag from: {}, to: {}. Bucket {} to {}",
          formatOffset(fromOffset),
          formatOffset(toOffset),
          startBucket,
          endBucket)
      }

      Source
        .unfold(startBucket)(current => {
          if (current <= endBucket) {
            Some((current.next(), current))
          } else {
            None
          }
        })
        .flatMapConcat(bucket => {
          log.debug("Scanning bucket {}", bucket)
          session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset)
        })
        .map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUuid("timestamp")))
        .toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
          case (acc, (pid, tagPidSequenceNr, timestamp)) =>
            val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match {
              case None =>
                (tagPidSequenceNr, timestamp)
              case Some((currentTagPidSequenceNr, currentTimestamp)) =>
                if (whichToKeep(tagPidSequenceNr, currentTagPidSequenceNr) == tagPidSequenceNr)
                  (tagPidSequenceNr, timestamp)
                else
                  (currentTagPidSequenceNr, currentTimestamp)
            }
            acc + (pid -> ((newTagPidSequenceNr, newTimestamp)))
        })(Keep.right)
        .withAttributes(ActorAttributes.dispatcher(pluginDispatcher))
        .run()
    }

    if (scanningPeriod > Duration.Zero) {
      after(scanningPeriod)(doIt())(materializer.system)
    } else {
      doIt()
    }

  }