in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [362:385]
private def writeMessages(atomicWrites: Seq[SerializedAtomicWrite]): Future[Unit] = {
// insert into the all_persistence_ids table for the first event, used by persistenceIds query
require(atomicWrites.nonEmpty)
require(atomicWrites.head.payload.nonEmpty)
val allPersistenceId =
if (settings.journalSettings.supportAllPersistenceIds && atomicWrites.head.payload.head.sequenceNr == 1L)
preparedInsertIntoAllPersistenceIds.map(_.bind(atomicWrites.head.persistenceId)).flatMap(execute(_))
else
FutureUnit
val boundStatements: Seq[Future[BoundStatement]] = statementGroup(atomicWrites)
allPersistenceId.flatMap { _ =>
boundStatements.size match {
case 1 =>
boundStatements.head.flatMap(execute(_))
case 0 => FutureUnit
case _ =>
Future.sequence(boundStatements).flatMap { stmts =>
executeBatch(batch => stmts.foldLeft(batch) { case (acc, next) => acc.add(next) })
}
}
}
}