in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [230:248]
def eventsStream(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long): Source[PersistentRepr, NotUsed] =
// toSequenceNr is already capped to highest and guaranteed to be no less than fromSequenceNr
Source.future(readSequenceNr(persistenceId, highest = false)).flatMapConcat { lowest =>
val start = Math.max(fromSequenceNr, lowest)
val async = ReplayParallelism > 1
Source(start to toSequenceNr)
.via(DynamoPartitionGrouped)
.mapAsync(ReplayParallelism)(batch => getPartitionItems(persistenceId, batch).map(_.sorted))
.mapConcat(identity)
.take(max)
.via(RemoveIncompleteAtoms)
.mapConcat(identity)
.mapAsync(ReplayParallelism)(readPersistentRepr(_, async))
}