private def statementGroup()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [387:436]


  private def statementGroup(atomicWrites: Seq[SerializedAtomicWrite]): Seq[Future[BoundStatement]] = {
    val maxPnr = partitionNr(atomicWrites.last.payload.last.sequenceNr, journalSettings.targetPartitionSize)
    val firstSeq = atomicWrites.head.payload.head.sequenceNr
    val minPnr = partitionNr(firstSeq, journalSettings.targetPartitionSize)
    val persistenceId: String = atomicWrites.head.persistenceId
    val all = atomicWrites.flatMap(_.payload)

    // reading assumes sequence numbers are in the right partition or partition + 1
    // even if we did allow this it would perform terribly as large C* batches are not good
    require(
      maxPnr - minPnr <= 1,
      "Do not support AtomicWrites that span 3 partitions. Keep AtomicWrites <= max partition size.")

    val writes: Seq[Future[BoundStatement]] = all.map { m: Serialized =>
      // using two separate statements with or without the meta data columns because
      // then users doesn't have to alter table and add the new columns if they don't use
      // the meta data feature
      val stmt =
        if (m.meta.isDefined) preparedWriteMessageWithMeta
        else preparedWriteMessage

      stmt.map { stmt =>
        val bs = stmt
          .bind()
          .setString("persistence_id", persistenceId)
          .setLong("partition_nr", maxPnr)
          .setLong("sequence_nr", m.sequenceNr)
          .setUuid("timestamp", m.timeUuid)
          // Keeping as text for backward compatibility
          .setString("timebucket", m.timeBucket.key.toString)
          .setString("writer_uuid", m.writerUuid)
          .setInt("ser_id", m.serId)
          .setString("ser_manifest", m.serManifest)
          .setString("event_manifest", m.eventAdapterManifest)
          .setByteBuffer("event", m.serialized)
          .setSet("tags", m.tags.asJava, classOf[String])

        // meta data, if any
        m.meta
          .map(meta => {
            bs.setInt("meta_ser_id", meta.serId)
              .setString("meta_ser_manifest", meta.serManifest)
              .setByteBuffer("meta", meta.serialized)
          })
          .getOrElse(bs)
      }
    }

    writes
  }