in core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala [66:80]
private def updateBatchForPartition(gtp: GroupTopicPartition, committable: Committable, offset: Long): Unit =
deferredOffsets.get(gtp) match {
case Some(dOffset: CommittableOffset) if dOffset.partitionOffset.offset < offset =>
deferredOffsets = deferredOffsets + (gtp -> committable)
offsetBatch = offsetBatch.updated(dOffset)
case Some(dOffsetBatch: CommittableOffsetBatch)
if dOffsetBatch.offsets.contains(gtp) && dOffsetBatch.offsets
.get(gtp)
.head < offset =>
deferredOffsets = deferredOffsets + (gtp -> committable)
offsetBatch = offsetBatch.updated(dOffsetBatch)
case None =>
deferredOffsets = deferredOffsets + (gtp -> committable)
case _ => ()
}