in src/main/scala/org/apache/pekko/persistence/dynamodb/query/scaladsl/internal/DynamoDBCurrentEventsByPersistenceIdQuery.scala [52:71]
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
require(toSequenceNr <= Int.MaxValue, "toSequenceNr can't be bigger than Int.MaxValue")
require(fromSequenceNr < toSequenceNr, "fromSequenceNr should be smaller than toSequenceNr")
log.debug("starting currentEventsByPersistenceId for {} from {} to {}", persistenceId, fromSequenceNr, toSequenceNr)
Source
.fromFuture(readSequenceNr(persistenceId = persistenceId, highest = true))
.flatMapConcat { highest =>
val end = Math.min(highest, toSequenceNr)
eventsStream(
persistenceId = persistenceId,
fromSequenceNr = fromSequenceNr,
toSequenceNr = end,
max = Int.MaxValue)
}
.map(_.toEventEnvelope)
.log(s"currentEventsByPersistenceId for $persistenceId from $fromSequenceNr to $toSequenceNr")
}