def eventsStream()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [230:248]


  def eventsStream(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long,
      max: Long): Source[PersistentRepr, NotUsed] =
    // toSequenceNr is already capped to highest and guaranteed to be no less than fromSequenceNr
    Source.future(readSequenceNr(persistenceId, highest = false)).flatMapConcat { lowest =>
      val start = Math.max(fromSequenceNr, lowest)
      val async = ReplayParallelism > 1

      Source(start to toSequenceNr)
        .via(DynamoPartitionGrouped)
        .mapAsync(ReplayParallelism)(batch => getPartitionItems(persistenceId, batch).map(_.sorted))
        .mapConcat(identity)
        .take(max)
        .via(RemoveIncompleteAtoms)
        .mapConcat(identity)
        .mapAsync(ReplayParallelism)(readPersistentRepr(_, async))
    }