private def currentPersistenceIdsByPageInternal[Result: PersistenceIdsResult]()

in src/main/scala/org/apache/pekko/persistence/dynamodb/query/scaladsl/internal/DynamoDBCurrentPersistenceIdsQuery.scala [95:125]


  private def currentPersistenceIdsByPageInternal[Result: PersistenceIdsResult](
      getPersistenceIds: Option[java.util.Map[String, AttributeValue]] => Future[Result])
      : Source[Seq[String], NotUsed] = {
    import system.dispatcher
    type ResultSource = Source[Option[Result], NotUsed]

    def nextCall(maybePreviousResult: Option[Result]): Future[Option[Result]] = {
      val maybeNextResult = for {
        previousResult <- maybePreviousResult
        nextEvaluatedKey <- previousResult.nextEvaluatedKey
      } yield getPersistenceIds(Some(nextEvaluatedKey)).map(Some(_))

      maybeNextResult.getOrElse(Future.successful(None))
    }

    def lazyStream(currentResult: ResultSource): ResultSource = {
      def nextResult: ResultSource = currentResult.mapAsync(parallelism = 1)(nextCall)

      currentResult.concatLazy(Source.lazySource { () => lazyStream(nextResult) })
    }

    val infiniteStreamOfResults: ResultSource =
      lazyStream(Source.fromFuture(getPersistenceIds(None).map(Some(_))))

    infiniteStreamOfResults
      .takeWhile(_.isDefined)
      .flatMapConcat(_.toSource)
      .map(persistenceIdsResult =>
        persistenceIdsResult.toPersistenceIdsPage.flatMap(rawPersistenceId =>
          parsePersistenceId(rawPersistenceId = rawPersistenceId, journalName = readJournalSettings.JournalName)))
  }