in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [538:641]
private def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
def physicalDelete(lowestPartition: Long, highestPartition: Long, toSeqNr: Long): Future[Done] = {
session.serverMetaData.flatMap { meta =>
if (meta.isVersion2 || settings.cosmosDb) {
physicalDelete2xCompat(lowestPartition, highestPartition, toSeqNr)
} else {
val deleteResult =
Future.sequence((lowestPartition to highestPartition).map { partitionNr =>
val boundDeleteMessages =
preparedDeleteMessages.futureResult().map(_.bind(persistenceId, partitionNr: JLong, toSeqNr: JLong))
boundDeleteMessages.flatMap(execute(_))
})
deleteResult.failed.foreach { e =>
log.warning(
"Unable to complete deletes for persistence id {}, toSequenceNr {}. " +
"The plugin will continue to function correctly but you will need to manually delete the old messages. " +
"Caused by: [{}: {}]",
persistenceId,
toSequenceNr,
e.getClass.getName,
e.getMessage)
}
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
}
}
}
def physicalDelete2xCompat(
lowestPartition: TagPidSequenceNr,
highestPartition: TagPidSequenceNr,
toSeqNr: TagPidSequenceNr): Future[Done] = {
def asyncDeleteMessages(partitionNr: TagPidSequenceNr, messageIds: Seq[MessageId]): Future[Unit] = {
val boundStatements = messageIds.map(mid =>
preparedDeleteMessages.futureResult().map(_.bind(mid.persistenceId, partitionNr: JLong,
mid.sequenceNr: JLong)))
Future.sequence(boundStatements).flatMap { stmts =>
executeBatch(batch => stmts.foldLeft(batch) { case (acc, next) => acc.add(next) })
}
}
val partitionInfos = (lowestPartition to highestPartition).map(partitionInfo(persistenceId, _, toSeqNr))
val deleteResult =
Future.sequence(partitionInfos.map(future =>
future.flatMap(pi => {
Future.sequence((pi.minSequenceNr to pi.maxSequenceNr).grouped(journalSettings.maxMessageBatchSize).map {
group =>
{
val groupDeleteResult =
asyncDeleteMessages(pi.partitionNr, group.map(MessageId(persistenceId, _)))
groupDeleteResult.failed.foreach { e =>
log.warning(
s"Unable to complete deletes for persistence id {}, toSequenceNr {}. " +
"The plugin will continue to function correctly but you will need to manually delete the old messages. " +
"Caused by: [{}: {}]",
persistenceId,
toSequenceNr,
e.getClass.getName,
e.getMessage)
}
groupDeleteResult
}
})
})))
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
}
// Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows.
def logicalAndPhysicalDelete(highestDeletedSequenceNumber: Long, highestSequenceNr: Long): Future[Done] = {
val lowestPartition = partitionNr(highestDeletedSequenceNumber + 1, journalSettings.targetPartitionSize)
val toSeqNr = math.min(toSequenceNr, highestSequenceNr)
val highestPartition = partitionNr(toSeqNr, journalSettings.targetPartitionSize) + 1 // may have been moved to the next partition
val logicalDelete =
if (toSeqNr <= highestDeletedSequenceNumber) {
// already deleted same or higher sequence number, don't update highestDeletedSequenceNumber,
// but perform the physical delete (again), may be a retry request
FutureUnit
} else {
val boundInsertDeletedTo =
preparedInsertDeletedTo.futureResult().map(_.bind(persistenceId, toSeqNr: JLong))
boundInsertDeletedTo.flatMap(execute)
}
logicalDelete.flatMap(_ => physicalDelete(lowestPartition, highestPartition, toSeqNr))
}
val deleteResult = for {
highestDeletedSequenceNumber <- asyncHighestDeletedSequenceNumber(persistenceId)
highestSequenceNr <- {
// MaxValue may be used as magic value to delete all events without specifying actual toSequenceNr
if (toSequenceNr == Long.MaxValue)
asyncFindHighestSequenceNr(persistenceId, highestDeletedSequenceNumber, journalSettings.targetPartitionSize)
else
Future.successful(toSequenceNr)
}
_ <- logicalAndPhysicalDelete(highestDeletedSequenceNumber, highestSequenceNr)
} yield ()
// Kick off any pending deletes when finished.
deleteResult.onComplete { result =>
self ! DeleteFinished(persistenceId, toSequenceNr, result)
}
deleteResult
}