override def allPersistenceIdsSource()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala [45:60]


  override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
    Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(correctMaxForH2Driver(max)).result))

  override def eventsByTag(
      tag: String,
      offset: Long,
      maxOffset: Long,
      max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {

    val publisher = db.stream(queries.eventsByTag((s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max))).result)
    // applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
    Source
      .fromPublisher(publisher)
      .via(perfectlyMatchTag(tag, readJournalConfig.pluginConfig.tagSeparator))
      .via(serializer.deserializeFlow)
  }