override def asyncDeleteMessagesTo()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [502:531]


  override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

    // TODO could "optimize" away deletes that overlap?
    pendingDeletes.get(persistenceId) match {
      case null =>
        log.debug("[{}] No outstanding delete. Sequence nr [{}]", persistenceId, toSequenceNr)
        // fast path, no outstanding deletes for this persistenceId
        val p = Promise[Unit]()
        pendingDeletes.put(persistenceId, List(PendingDelete(persistenceId, toSequenceNr, p)))
        delete(persistenceId, toSequenceNr)
        p.future
      case otherDeletes =>
        if (otherDeletes.length > settings.journalSettings.maxConcurrentDeletes) {
          log.error(
            "[{}] Over [{}] outstanding deletes. Failing delete",
            persistenceId,
            settings.journalSettings.maxConcurrentDeletes)
          Future.failed(new RuntimeException(
            s"Over ${settings.journalSettings.maxConcurrentDeletes} outstanding deletes for persistenceId $persistenceId"))
        } else {
          log.debug(
            "[{}] outstanding delete. Delete to seqNr [{}] will be scheduled after previous one finished.",
            persistenceId,
            toSequenceNr)
          val p = Promise[Unit]()
          pendingDeletes.put(persistenceId, otherDeletes :+ PendingDelete(persistenceId, toSequenceNr, p))
          p.future
        }
    }
  }