in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournalStatements.scala [263:390]
def deleteMessages(cassandra2xCompat: Boolean) =
if (cassandra2xCompat)
s"""
DELETE FROM $tableName WHERE
persistence_id = ? AND
partition_nr = ? AND
sequence_nr = ?
"""
else
s"""
DELETE FROM $tableName WHERE
persistence_id = ? AND
partition_nr = ? AND
sequence_nr >= 0 AND
sequence_nr <= ?
"""
def selectMessages =
s"""
SELECT * FROM $tableName WHERE
persistence_id = ? AND
partition_nr = ? AND
sequence_nr >= ? AND
sequence_nr <= ?
"""
def selectHighestSequenceNr =
s"""
SELECT sequence_nr FROM $tableName WHERE
persistence_id = ? AND
partition_nr = ?
ORDER BY sequence_nr
DESC LIMIT 1
"""
def selectDeletedTo =
s"""
SELECT deleted_to FROM $metadataTableName WHERE
persistence_id = ?
"""
def insertDeletedTo =
s"""
INSERT INTO $metadataTableName (persistence_id, deleted_to)
VALUES ( ?, ? )
"""
def deleteDeletedTo =
s"""
DELETE FROM $metadataTableName where persistence_id = ?
"""
def insertIntoAllPersistenceIds =
s"""
INSERT INTO $allPersistenceIdsTableName (persistence_id)
VALUES ( ? )
"""
def deleteFromAllPersistenceIds =
s"""
DELETE FROM $allPersistenceIdsTableName where persistence_id = ?
"""
protected def tableName = s"${journalSettings.keyspace}.${journalSettings.table}"
private def tagTableName = s"${journalSettings.keyspace}.${eventsByTagSettings.tagTable.name}"
private def tagProgressTableName = s"${journalSettings.keyspace}.tag_write_progress"
private def tagScanningTableName = s"${journalSettings.keyspace}.tag_scanning"
private def metadataTableName = s"${journalSettings.keyspace}.${journalSettings.metadataTable}"
private def allPersistenceIdsTableName = s"${journalSettings.keyspace}.${journalSettings.allPersistenceIdsTable}"
/**
* Execute creation of keyspace and tables if that is enabled in config.
* Avoid calling this from several threads at the same time to
* reduce the risk of (annoying) "Column family ID mismatch" exception.
*
* Exceptions will be logged but will not fail the returned Future.
*/
def executeCreateKeyspaceAndTables(session: CqlSession, log: LoggingAdapter)(
implicit ec: ExecutionContext): Future[Done] = {
def tagStatements: Future[Done] =
if (eventsByTagSettings.eventsByTagEnabled) {
for {
_ <- session.executeAsync(createTagsTable).asScala
_ <- session.executeAsync(createTagsProgressTable).asScala
_ <- session.executeAsync(createTagScanningTable).asScala
} yield Done
} else FutureDone
def keyspace: Future[Done] =
if (journalSettings.keyspaceAutoCreate)
session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone
val done = if (journalSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).asScala
_ <- session.executeAsync(createMetadataTable).asScala
_ <- {
if (settings.journalSettings.supportAllPersistenceIds)
session.executeAsync(createAllPersistenceIdsTable).asScala
else
FutureDone
}
_ <- tagStatements
} yield {
session.setSchemaMetadataEnabled(null)
Done
}
result.recoverWith {
case e =>
log.warning("Failed to create journal keyspace and tables: {}", e)
session.setSchemaMetadataEnabled(null)
FutureDone
}
} else {
keyspace.recoverWith {
case e =>
log.warning("Failed to create journal keyspace: {}", e)
FutureDone
}
}
done
}