in core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala [286:326]
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
val deleteMarkerSeqNrFut =
if (toSequenceNr == Long.MaxValue)
readHighestSequenceNr(persistenceId, 0L)
else
Future.successful(toSequenceNr)
deleteMarkerSeqNrFut.flatMap { deleteMarkerSeqNr =>
def bindDeleteMarker(stmt: Statement): Statement = {
stmt
.bind(0, slice)
.bind(1, entityType)
.bind(2, persistenceId)
.bind(3, deleteMarkerSeqNr)
.bind(4, "")
.bind(5, "")
.bind(6, 0)
.bind(7, "")
.bind(8, Array.emptyByteArray)
.bind(9, true)
}
val result = r2dbcExecutor.update(s"delete [$persistenceId]") { connection =>
Vector(
connection
.createStatement(deleteEventsSql)
.bind(0, persistenceId)
.bind(1, toSequenceNr),
bindDeleteMarker(connection.createStatement(insertDeleteMarkerSql)))
}
if (log.isDebugEnabled)
result.foreach(updatedRows =>
log.debug("Deleted [{}] events for persistenceId [{}]", updatedRows.head, persistenceId))
result.map(_ => ())(ExecutionContexts.parasitic)
}
}