in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [387:436]
private def statementGroup(atomicWrites: Seq[SerializedAtomicWrite]): Seq[Future[BoundStatement]] = {
val maxPnr = partitionNr(atomicWrites.last.payload.last.sequenceNr, journalSettings.targetPartitionSize)
val firstSeq = atomicWrites.head.payload.head.sequenceNr
val minPnr = partitionNr(firstSeq, journalSettings.targetPartitionSize)
val persistenceId: String = atomicWrites.head.persistenceId
val all = atomicWrites.flatMap(_.payload)
// reading assumes sequence numbers are in the right partition or partition + 1
// even if we did allow this it would perform terribly as large C* batches are not good
require(
maxPnr - minPnr <= 1,
"Do not support AtomicWrites that span 3 partitions. Keep AtomicWrites <= max partition size.")
val writes: Seq[Future[BoundStatement]] = all.map { m: Serialized =>
// using two separate statements with or without the meta data columns because
// then users doesn't have to alter table and add the new columns if they don't use
// the meta data feature
val stmt =
if (m.meta.isDefined) preparedWriteMessageWithMeta
else preparedWriteMessage
stmt.map { stmt =>
val bs = stmt
.bind()
.setString("persistence_id", persistenceId)
.setLong("partition_nr", maxPnr)
.setLong("sequence_nr", m.sequenceNr)
.setUuid("timestamp", m.timeUuid)
// Keeping as text for backward compatibility
.setString("timebucket", m.timeBucket.key.toString)
.setString("writer_uuid", m.writerUuid)
.setInt("ser_id", m.serId)
.setString("ser_manifest", m.serManifest)
.setString("event_manifest", m.eventAdapterManifest)
.setByteBuffer("event", m.serialized)
.setSet("tags", m.tags.asJava, classOf[String])
// meta data, if any
m.meta
.map(meta => {
bs.setInt("meta_ser_id", meta.serId)
.setString("meta_ser_manifest", meta.serManifest)
.setByteBuffer("meta", meta.serialized)
})
.getOrElse(bs)
}
}
writes
}