in src/main/scala/org/apache/pekko/persistence/dynamodb/query/scaladsl/internal/DynamoDBCurrentPersistenceIdsQuery.scala [95:125]
private def currentPersistenceIdsByPageInternal[Result: PersistenceIdsResult](
getPersistenceIds: Option[java.util.Map[String, AttributeValue]] => Future[Result])
: Source[Seq[String], NotUsed] = {
import system.dispatcher
type ResultSource = Source[Option[Result], NotUsed]
def nextCall(maybePreviousResult: Option[Result]): Future[Option[Result]] = {
val maybeNextResult = for {
previousResult <- maybePreviousResult
nextEvaluatedKey <- previousResult.nextEvaluatedKey
} yield getPersistenceIds(Some(nextEvaluatedKey)).map(Some(_))
maybeNextResult.getOrElse(Future.successful(None))
}
def lazyStream(currentResult: ResultSource): ResultSource = {
def nextResult: ResultSource = currentResult.mapAsync(parallelism = 1)(nextCall)
currentResult.concatLazy(Source.lazySource { () => lazyStream(nextResult) })
}
val infiniteStreamOfResults: ResultSource =
lazyStream(Source.future(getPersistenceIds(None).map(Some(_))))
infiniteStreamOfResults
.takeWhile(_.isDefined)
.flatMapConcat(_.toSource)
.map(persistenceIdsResult =>
persistenceIdsResult.toPersistenceIdsPage.flatMap(rawPersistenceId =>
parsePersistenceId(rawPersistenceId = rawPersistenceId, journalName = readJournalSettings.JournalName)))
}