def updateBatch()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala [41:64]


  def updateBatch(committable: Committable): Boolean = {
    if (settings.when == OffsetFirstObserved) {
      offsetBatch = offsetBatch.updated(committable)
    } else { // CommitWhen.NextOffsetObserved
      committable match {
        case single: CommittableOffset =>
          val gtp = single.partitionOffset.key
          updateBatchForPartition(gtp, single, single.partitionOffset.offset)
        case batch: CommittableOffsetBatchImpl =>
          for { (gtp, offsetAndMetadata) <- batch.offsetsAndMetadata } updateBatchForPartition(
            gtp,
            batch.filter(_.equals(gtp)),
            offsetAndMetadata.offset())
        case null =>
          throw new IllegalArgumentException(
            s"Unknown Committable implementation, got [null]")
        case unknownImpl =>
          throw new IllegalArgumentException(
            s"Unknown Committable implementation, got [${unknownImpl.getClass.getName}]")

      }
    }
    offsetBatch.batchSize >= settings.maxBatch
  }