in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [67:112]
def writeBatch(tag: Tag, events: Buffer)(implicit ec: ExecutionContext): Future[Done] = {
val batch = new BatchStatementBuilder(BatchType.UNLOGGED)
batch.setExecutionProfileName(writeProfile)
val tagWritePSs = for {
withMeta <- taggedPreparedStatements.WriteTagViewWithMeta.futureResult()
withoutMeta <- taggedPreparedStatements.WriteTagViewWithoutMeta.futureResult()
} yield (withMeta, withoutMeta)
tagWritePSs
.map {
case (withMeta, withoutMeta) =>
events.nextBatch.foreach { awaitingWrite =>
awaitingWrite.events.foreach {
case (event, pidTagSequenceNr) =>
val ps = if (event.meta.isDefined) withMeta else withoutMeta
val bound = ps.bind(
tag,
event.timeBucket.key: JLong,
event.timeUuid,
pidTagSequenceNr: JLong,
event.serialized,
event.eventAdapterManifest,
event.persistenceId,
event.sequenceNr: JLong,
event.serId: JInt,
event.serManifest,
event.writerUuid)
val finished = event.meta match {
case Some(m) =>
bound
.setByteBuffer("meta", m.serialized)
.setString("meta_ser_manifest", m.serManifest)
.setInt("meta_ser_id", m.serId)
case None =>
bound
}
// this is a mutable builder
batch.addStatement(finished)
}
}
batch.build()
}
.flatMap(executeWrite)
}