in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala [44:58]
override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(correctMaxForH2Driver(max)).result))
override def eventsByTag(
tag: String,
offset: Long,
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {
// This doesn't populate the tags. AFAICT they aren't used
Source
.fromPublisher(db.stream(queries.eventsByTag((tag, offset, maxOffset, correctMaxForH2Driver(max))).result))
.map(row =>
PekkoSerialization.fromRow(serialization)(row).map { case (repr, ordering) => (repr, Set.empty, ordering) })
}