in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/Buffer.scala [62:105]
final def add(write: AwaitingWrite): Buffer = {
val firstTimeBucket = write.events.head._1.timeBucket
val lastTimeBucket = write.events.last._1.timeBucket
if (firstTimeBucket != lastTimeBucket) {
// this write needs broken up as it spans multiple time buckets
val (first, rest) = write.events.partition {
case (serialized, _) => serialized.timeBucket == firstTimeBucket
}
add(AwaitingWrite(first, OptionVal.None)).add(AwaitingWrite(rest, write.ack))
} else {
// common case
val newSize = size + write.events.size
if (writeRequired) {
// add them to pending, any time bucket changes will be detected later
copy(size = newSize, pending = pending :+ write)
} else if (nextBatch.headOption.exists(oldestEvent =>
UUIDComparator.comparator
.compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
// rare case where events have been received out of order, just re-build the buffer
require(pending.isEmpty)
val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
rebuild(allWrites)
} else if (nextBatch.headOption.exists(_.events.head._1.timeBucket != write.events.head._1.timeBucket)) {
// time bucket has changed
copy(size = newSize, pending = pending :+ write, writeRequired = true)
} else if (newSize >= batchSize) {
require(pending.isEmpty, "Pending should be empty if write not required")
// does the new write need broken up?
if (newSize > batchSize) {
val toAdd = batchSize - size
val (forNextWrite, forPending) = write.events.splitAt(toAdd)
copy(
size = newSize,
nextBatch = nextBatch :+ AwaitingWrite(forNextWrite, OptionVal.None),
pending = Vector(AwaitingWrite(forPending, write.ack)),
writeRequired = true)
} else {
copy(size = newSize, nextBatch = nextBatch :+ write, writeRequired = true)
}
} else {
copy(size = size + write.events.size, nextBatch = nextBatch :+ write)
}
}
}