in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala [45:60]
override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(correctMaxForH2Driver(max)).result))
override def eventsByTag(
tag: String,
offset: Long,
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {
val publisher = db.stream(queries.eventsByTag((s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max))).result)
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
Source
.fromPublisher(publisher)
.via(perfectlyMatchTag(tag, readJournalConfig.pluginConfig.tagSeparator))
.via(serializer.deserializeFlow)
}