in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [502:531]
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
// TODO could "optimize" away deletes that overlap?
pendingDeletes.get(persistenceId) match {
case null =>
log.debug("[{}] No outstanding delete. Sequence nr [{}]", persistenceId, toSequenceNr)
// fast path, no outstanding deletes for this persistenceId
val p = Promise[Unit]()
pendingDeletes.put(persistenceId, List(PendingDelete(persistenceId, toSequenceNr, p)))
delete(persistenceId, toSequenceNr)
p.future
case otherDeletes =>
if (otherDeletes.length > settings.journalSettings.maxConcurrentDeletes) {
log.error(
"[{}] Over [{}] outstanding deletes. Failing delete",
persistenceId,
settings.journalSettings.maxConcurrentDeletes)
Future.failed(new RuntimeException(
s"Over ${settings.journalSettings.maxConcurrentDeletes} outstanding deletes for persistenceId $persistenceId"))
} else {
log.debug(
"[{}] outstanding delete. Delete to seqNr [{}] will be scheduled after previous one finished.",
persistenceId,
toSequenceNr)
val p = Promise[Unit]()
pendingDeletes.put(persistenceId, otherDeletes :+ PendingDelete(persistenceId, toSequenceNr, p))
p.future
}
}
}