override def currentEventsByPersistenceId()

in src/main/scala/org/apache/pekko/persistence/dynamodb/query/scaladsl/internal/DynamoDBCurrentEventsByPersistenceIdQuery.scala [52:71]


  override def currentEventsByPersistenceId(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
    require(toSequenceNr <= Int.MaxValue, "toSequenceNr can't be bigger than Int.MaxValue")
    require(fromSequenceNr < toSequenceNr, "fromSequenceNr should be smaller than toSequenceNr")
    log.debug("starting currentEventsByPersistenceId for {} from {} to {}", persistenceId, fromSequenceNr, toSequenceNr)
    Source
      .future(readSequenceNr(persistenceId = persistenceId, highest = true))
      .flatMapConcat { highest =>
        val end = Math.min(highest, toSequenceNr)
        eventsStream(
          persistenceId = persistenceId,
          fromSequenceNr = fromSequenceNr,
          toSequenceNr = end,
          max = Int.MaxValue)
      }
      .map(_.toEventEnvelope)
      .log(s"currentEventsByPersistenceId for $persistenceId from $fromSequenceNr to $toSequenceNr")
  }