in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala [127:158]
abstract override def eventsByTag(
tag: String,
offset: Long,
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {
if (isOracleDriver(profile)) {
val theOffset = Math.max(0, offset)
val theTag = s"%$tag%"
val selectStatement =
sql"""
SELECT "#$ordering", "#$deleted", "#$persistenceId", "#$sequenceNumber", "#$message", "#$tags"
FROM (
SELECT * FROM #$theTableName
WHERE "#$tags" LIKE $theTag
AND "#$ordering" > $theOffset
AND "#$ordering" <= $maxOffset
AND "#$deleted" = 'false'
ORDER BY "#$ordering"
)
WHERE rownum <= $max""".as[JournalRow]
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
Source
.fromPublisher(db.stream(selectStatement))
.via(perfectlyMatchTag(tag, readJournalConfig.pluginConfig.tagSeparator))
.via(serializer.deserializeFlow)
} else {
super.eventsByTag(tag, offset, maxOffset, max)
}
}