in core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala [41:64]
def updateBatch(committable: Committable): Boolean = {
if (settings.when == OffsetFirstObserved) {
offsetBatch = offsetBatch.updated(committable)
} else { // CommitWhen.NextOffsetObserved
committable match {
case single: CommittableOffset =>
val gtp = single.partitionOffset.key
updateBatchForPartition(gtp, single, single.partitionOffset.offset)
case batch: CommittableOffsetBatchImpl =>
for { (gtp, offsetAndMetadata) <- batch.offsetsAndMetadata } updateBatchForPartition(
gtp,
batch.filter(_.equals(gtp)),
offsetAndMetadata.offset())
case null =>
throw new IllegalArgumentException(
s"Unknown Committable implementation, got [null]")
case unknownImpl =>
throw new IllegalArgumentException(
s"Unknown Committable implementation, got [${unknownImpl.getClass.getName}]")
}
}
offsetBatch.batchSize >= settings.maxBatch
}