def deleteMessages()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala [263:390]


  def deleteMessages(cassandra2xCompat: Boolean) =
    if (cassandra2xCompat)
      s"""
      DELETE FROM $tableName WHERE
        persistence_id = ? AND
        partition_nr = ? AND
        sequence_nr = ?
    """
    else
      s"""
      DELETE FROM $tableName WHERE
        persistence_id = ? AND
        partition_nr = ? AND
        sequence_nr >= 0 AND
        sequence_nr <= ?
    """

  def selectMessages =
    s"""
      SELECT * FROM $tableName WHERE
        persistence_id = ? AND
        partition_nr = ? AND
        sequence_nr >= ? AND
        sequence_nr <= ?
    """

  def selectHighestSequenceNr =
    s"""
     SELECT sequence_nr FROM $tableName WHERE
       persistence_id = ? AND
       partition_nr = ?
       ORDER BY sequence_nr
       DESC LIMIT 1
   """

  def selectDeletedTo =
    s"""
      SELECT deleted_to FROM $metadataTableName WHERE
        persistence_id = ?
    """

  def insertDeletedTo =
    s"""
      INSERT INTO $metadataTableName (persistence_id, deleted_to)
      VALUES ( ?, ? )
    """

  def deleteDeletedTo =
    s"""
      DELETE FROM $metadataTableName where persistence_id = ?
    """

  def insertIntoAllPersistenceIds =
    s"""
      INSERT INTO $allPersistenceIdsTableName (persistence_id)
      VALUES ( ? )
    """

  def deleteFromAllPersistenceIds =
    s"""
      DELETE FROM $allPersistenceIdsTableName where persistence_id = ?
    """

  protected def tableName = s"${journalSettings.keyspace}.${journalSettings.table}"
  private def tagTableName = s"${journalSettings.keyspace}.${eventsByTagSettings.tagTable.name}"
  private def tagProgressTableName = s"${journalSettings.keyspace}.tag_write_progress"
  private def tagScanningTableName = s"${journalSettings.keyspace}.tag_scanning"
  private def metadataTableName = s"${journalSettings.keyspace}.${journalSettings.metadataTable}"
  private def allPersistenceIdsTableName = s"${journalSettings.keyspace}.${journalSettings.allPersistenceIdsTable}"

  /**
   * Execute creation of keyspace and tables if that is enabled in config.
   * Avoid calling this from several threads at the same time to
   * reduce the risk of (annoying) "Column family ID mismatch" exception.
   *
   * Exceptions will be logged but will not fail the returned Future.
   */
  def executeCreateKeyspaceAndTables(session: CqlSession, log: LoggingAdapter)(
      implicit ec: ExecutionContext): Future[Done] = {

    def tagStatements: Future[Done] =
      if (eventsByTagSettings.eventsByTagEnabled) {
        for {
          _ <- session.executeAsync(createTagsTable).asScala
          _ <- session.executeAsync(createTagsProgressTable).asScala
          _ <- session.executeAsync(createTagScanningTable).asScala
        } yield Done
      } else FutureDone

    def keyspace: Future[Done] =
      if (journalSettings.keyspaceAutoCreate)
        session.executeAsync(createKeyspace).asScala.map(_ => Done)
      else FutureDone

    val done = if (journalSettings.tablesAutoCreate) {
      // reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
      session.setSchemaMetadataEnabled(false)
      val result = for {
        _ <- keyspace
        _ <- session.executeAsync(createTable).asScala
        _ <- session.executeAsync(createMetadataTable).asScala
        _ <- {
          if (settings.journalSettings.supportAllPersistenceIds)
            session.executeAsync(createAllPersistenceIdsTable).asScala
          else
            FutureDone
        }
        _ <- tagStatements
      } yield {
        session.setSchemaMetadataEnabled(null)
        Done
      }
      result.recoverWith {
        case e =>
          log.warning("Failed to create journal keyspace and tables: {}", e)
          session.setSchemaMetadataEnabled(null)
          FutureDone
      }
    } else {
      keyspace.recoverWith {
        case e =>
          log.warning("Failed to create journal keyspace: {}", e)
          FutureDone
      }
    }

    done
  }