override def allPersistenceIdsSource()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala [44:58]


  override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
    Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(correctMaxForH2Driver(max)).result))

  override def eventsByTag(
      tag: String,
      offset: Long,
      maxOffset: Long,
      max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {

    // This doesn't populate the tags. AFAICT they aren't used
    Source
      .fromPublisher(db.stream(queries.eventsByTag((tag, offset, maxOffset, correctMaxForH2Driver(max))).result))
      .map(row =>
        PekkoSerialization.fromRow(serialization)(row).map { case (repr, ordering) => (repr, Set.empty, ordering) })
  }