private def updateBatchForPartition()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala [66:80]


  private def updateBatchForPartition(gtp: GroupTopicPartition, committable: Committable, offset: Long): Unit =
    deferredOffsets.get(gtp) match {
      case Some(dOffset: CommittableOffset) if dOffset.partitionOffset.offset < offset =>
        deferredOffsets = deferredOffsets + (gtp -> committable)
        offsetBatch = offsetBatch.updated(dOffset)
      case Some(dOffsetBatch: CommittableOffsetBatch)
          if dOffsetBatch.offsets.contains(gtp) && dOffsetBatch.offsets
            .get(gtp)
            .head < offset =>
        deferredOffsets = deferredOffsets + (gtp -> committable)
        offsetBatch = offsetBatch.updated(dOffsetBatch)
      case None =>
        deferredOffsets = deferredOffsets + (gtp -> committable)
      case _ => ()
    }