in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [101:128]
override def currentPersistenceIds(): Source[String, NotUsed] =
readJournalDao.allPersistenceIdsSource(Long.MaxValue)
/**
* `persistenceIds` is used to retrieve a stream of all `persistenceId`s as strings.
*
* The stream guarantees that a `persistenceId` is only emitted once and there are no duplicates.
* Order is not defined. Multiple executions of the same stream (even bounded) may emit different
* sequence of `persistenceId`s.
*
* The stream is not completed when it reaches the end of the currently known `persistenceId`s,
* but it continues to push new `persistenceId`s when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* known `persistenceId`s is provided by `currentPersistenceIds`.
*/
override def persistenceIds(): Source[String, NotUsed] =
Source
.repeat(0)
.flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds()))
.statefulMapConcat[String] { () =>
var knownIds = Set.empty[String]
def next(id: String): Iterable[String] = {
val xs = Set(id).diff(knownIds)
knownIds += id
xs
}
id => next(id)
}