final def add()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/Buffer.scala [62:105]


  final def add(write: AwaitingWrite): Buffer = {
    val firstTimeBucket = write.events.head._1.timeBucket
    val lastTimeBucket = write.events.last._1.timeBucket
    if (firstTimeBucket != lastTimeBucket) {
      // this write needs broken up as it spans multiple time buckets
      val (first, rest) = write.events.partition {
        case (serialized, _) => serialized.timeBucket == firstTimeBucket
      }
      add(AwaitingWrite(first, OptionVal.None)).add(AwaitingWrite(rest, write.ack))
    } else {
      // common case
      val newSize = size + write.events.size
      if (writeRequired) {
        // add them to pending, any time bucket changes will be detected later
        copy(size = newSize, pending = pending :+ write)
      } else if (nextBatch.headOption.exists(oldestEvent =>
          UUIDComparator.comparator
            .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
        // rare case where events have been received out of order, just re-build the buffer
        require(pending.isEmpty)
        val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
        rebuild(allWrites)
      } else if (nextBatch.headOption.exists(_.events.head._1.timeBucket != write.events.head._1.timeBucket)) {
        // time bucket has changed
        copy(size = newSize, pending = pending :+ write, writeRequired = true)
      } else if (newSize >= batchSize) {
        require(pending.isEmpty, "Pending should be empty if write not required")
        // does the new write need broken up?
        if (newSize > batchSize) {
          val toAdd = batchSize - size
          val (forNextWrite, forPending) = write.events.splitAt(toAdd)
          copy(
            size = newSize,
            nextBatch = nextBatch :+ AwaitingWrite(forNextWrite, OptionVal.None),
            pending = Vector(AwaitingWrite(forPending, write.ack)),
            writeRequired = true)
        } else {
          copy(size = newSize, nextBatch = nextBatch :+ write, writeRequired = true)
        }
      } else {
        copy(size = size + write.events.size, nextBatch = nextBatch :+ write)
      }
    }
  }