in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [168:209]
def initialize(): Future[Done] =
Future
.sequence(
List(
preparedSelectDeletedTo.futureResult(),
preparedSelectAllPersistenceIds.futureResult(),
preparedSelectEventsByPersistenceId.futureResult(),
preparedSelectFromTagViewWithUpperBound.futureResult(),
preparedSelectTagSequenceNrs.futureResult()))
.map(_ => Done)
private val preparedSelectEventsByPersistenceId: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(statements.journalStatements.selectMessages))
private val preparedSelectDeletedTo: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(statements.journalStatements.selectDeletedTo))
private val preparedSelectAllPersistenceIds: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(queryStatements.selectAllPersistenceIds))
private val preparedSelectDistinctPersistenceIds: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(queryStatements.selectDistinctPersistenceIds))
private val preparedSelectFromTagViewWithUpperBound: RetryableFutureEval[PreparedStatement] =
RetryableFutureEval(() =>
session.prepare(queryStatements.selectEventsFromTagViewWithUpperBound))
private val preparedSelectTagSequenceNrs: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(queryStatements.selectTagSequenceNrs))
private val preparedSelectHighestSequenceNr: RetryableFutureEval[PreparedStatement] = RetryableFutureEval(() =>
session.prepare(statements.journalStatements.selectHighestSequenceNr))
/**
* INTERNAL API
*/
@InternalApi private[pekko] def combinedEventsByPersistenceIdStmts: Future[CombinedEventsByPersistenceIdStmts] =
for {
ps1 <- preparedSelectEventsByPersistenceId.futureResult()
ps2 <- preparedSelectHighestSequenceNr.futureResult()
ps3 <- preparedSelectDeletedTo.futureResult()
} yield CombinedEventsByPersistenceIdStmts(ps1, ps2, ps3)