def writeBatch()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [69:114]


    def writeBatch(tag: Tag, events: Buffer)(implicit ec: ExecutionContext): Future[Done] = {
      val batch = new BatchStatementBuilder(BatchType.UNLOGGED)
      batch.setExecutionProfileName(writeProfile)
      val tagWritePSs = for {
        withMeta <- taggedPreparedStatements.WriteTagViewWithMeta
        withoutMeta <- taggedPreparedStatements.WriteTagViewWithoutMeta
      } yield (withMeta, withoutMeta)

      tagWritePSs
        .map {
          case (withMeta, withoutMeta) =>
            events.nextBatch.foreach { awaitingWrite =>
              awaitingWrite.events.foreach {
                case (event, pidTagSequenceNr) =>
                  val ps = if (event.meta.isDefined) withMeta else withoutMeta
                  val bound = ps.bind(
                    tag,
                    event.timeBucket.key: JLong,
                    event.timeUuid,
                    pidTagSequenceNr: JLong,
                    event.serialized,
                    event.eventAdapterManifest,
                    event.persistenceId,
                    event.sequenceNr: JLong,
                    event.serId: JInt,
                    event.serManifest,
                    event.writerUuid)

                  val finished = event.meta match {
                    case Some(m) =>
                      bound
                        .setByteBuffer("meta", m.serialized)
                        .setString("meta_ser_manifest", m.serManifest)
                        .setInt("meta_ser_id", m.serId)
                    case None =>
                      bound
                  }

                  // this is a mutable builder
                  batch.addStatement(finished)
              }
            }
            batch.build()
        }
        .flatMap(executeWrite)
    }