private def delete()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [533:635]


  private def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

    def physicalDelete(lowestPartition: Long, highestPartition: Long, toSeqNr: Long): Future[Done] = {
      session.serverMetaData.flatMap { meta =>
        if (meta.isVersion2 || settings.cosmosDb) {
          physicalDelete2xCompat(lowestPartition, highestPartition, toSeqNr)
        } else {
          val deleteResult =
            Future.sequence((lowestPartition to highestPartition).map { partitionNr =>
              val boundDeleteMessages =
                preparedDeleteMessages.map(_.bind(persistenceId, partitionNr: JLong, toSeqNr: JLong))
              boundDeleteMessages.flatMap(execute(_))
            })
          deleteResult.failed.foreach { e =>
            log.warning(
              "Unable to complete deletes for persistence id {}, toSequenceNr {}. " +
              "The plugin will continue to function correctly but you will need to manually delete the old messages. " +
              "Caused by: [{}: {}]",
              persistenceId,
              toSequenceNr,
              e.getClass.getName,
              e.getMessage)
          }
          deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
        }
      }
    }

    def physicalDelete2xCompat(
        lowestPartition: TagPidSequenceNr,
        highestPartition: TagPidSequenceNr,
        toSeqNr: TagPidSequenceNr): Future[Done] = {
      def asyncDeleteMessages(partitionNr: TagPidSequenceNr, messageIds: Seq[MessageId]): Future[Unit] = {
        val boundStatements = messageIds.map(mid =>
          preparedDeleteMessages.map(_.bind(mid.persistenceId, partitionNr: JLong, mid.sequenceNr: JLong)))
        Future.sequence(boundStatements).flatMap { stmts =>
          executeBatch(batch => stmts.foldLeft(batch) { case (acc, next) => acc.add(next) })
        }
      }

      val partitionInfos = (lowestPartition to highestPartition).map(partitionInfo(persistenceId, _, toSeqNr))
      val deleteResult =
        Future.sequence(partitionInfos.map(future =>
          future.flatMap(pi => {
            Future.sequence((pi.minSequenceNr to pi.maxSequenceNr).grouped(journalSettings.maxMessageBatchSize).map {
              group =>
                {
                  val groupDeleteResult =
                    asyncDeleteMessages(pi.partitionNr, group.map(MessageId(persistenceId, _)))
                  groupDeleteResult.failed.foreach { e =>
                    log.warning(
                      s"Unable to complete deletes for persistence id {}, toSequenceNr {}. " +
                      "The plugin will continue to function correctly but you will need to manually delete the old messages. " +
                      "Caused by: [{}: {}]",
                      persistenceId,
                      toSequenceNr,
                      e.getClass.getName,
                      e.getMessage)
                  }
                  groupDeleteResult
                }
            })
          })))
      deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
    }

    // Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows.
    def logicalAndPhysicalDelete(highestDeletedSequenceNumber: Long, highestSequenceNr: Long): Future[Done] = {
      val lowestPartition = partitionNr(highestDeletedSequenceNumber + 1, journalSettings.targetPartitionSize)
      val toSeqNr = math.min(toSequenceNr, highestSequenceNr)
      val highestPartition = partitionNr(toSeqNr, journalSettings.targetPartitionSize) + 1 // may have been moved to the next partition
      val logicalDelete =
        if (toSeqNr <= highestDeletedSequenceNumber) {
          // already deleted same or higher sequence number, don't update highestDeletedSequenceNumber,
          // but perform the physical delete (again), may be a retry request
          FutureUnit
        } else {
          val boundInsertDeletedTo =
            preparedInsertDeletedTo.map(_.bind(persistenceId, toSeqNr: JLong))
          boundInsertDeletedTo.flatMap(execute)
        }
      logicalDelete.flatMap(_ => physicalDelete(lowestPartition, highestPartition, toSeqNr))
    }

    val deleteResult = for {
      highestDeletedSequenceNumber <- asyncHighestDeletedSequenceNumber(persistenceId)
      highestSequenceNr <- {
        // MaxValue may be used as magic value to delete all events without specifying actual toSequenceNr
        if (toSequenceNr == Long.MaxValue)
          asyncFindHighestSequenceNr(persistenceId, highestDeletedSequenceNumber, journalSettings.targetPartitionSize)
        else
          Future.successful(toSequenceNr)
      }
      _ <- logicalAndPhysicalDelete(highestDeletedSequenceNumber, highestSequenceNr)
    } yield ()

    // Kick off any pending deletes when finished.
    deleteResult.onComplete { result =>
      self ! DeleteFinished(persistenceId, toSequenceNr, result)
    }

    deleteResult
  }