private def writeMessages()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [362:385]


  private def writeMessages(atomicWrites: Seq[SerializedAtomicWrite]): Future[Unit] = {
    // insert into the all_persistence_ids table for the first event, used by persistenceIds query
    require(atomicWrites.nonEmpty)
    require(atomicWrites.head.payload.nonEmpty)
    val allPersistenceId =
      if (settings.journalSettings.supportAllPersistenceIds && atomicWrites.head.payload.head.sequenceNr == 1L)
        preparedInsertIntoAllPersistenceIds.map(_.bind(atomicWrites.head.persistenceId)).flatMap(execute(_))
      else
        FutureUnit

    val boundStatements: Seq[Future[BoundStatement]] = statementGroup(atomicWrites)

    allPersistenceId.flatMap { _ =>
      boundStatements.size match {
        case 1 =>
          boundStatements.head.flatMap(execute(_))
        case 0 => FutureUnit
        case _ =>
          Future.sequence(boundStatements).flatMap { stmts =>
            executeBatch(batch => stmts.foldLeft(batch) { case (acc, next) => acc.add(next) })
          }
      }
    }
  }