in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala [63:95]
def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber <= maxSequenceNr)
.filter(_.deleted === false)
.map(_.deleted)
.update(true)
private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max
private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).take(1).map(_.sequenceNumber).max
val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)
private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceIdDesc(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)
val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)
private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
JournalTable.map(_.persistenceId).distinct
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)
def journalRowByPersistenceIds(persistenceIds: Iterable[String]): Query[Rep[String], String, Seq] =
for {
query <- JournalTable.map(_.persistenceId)
if query.inSetBind(persistenceIds)
} yield query