in core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala [178:207]
private[internal] def committerFor(groupTopicPartition: GroupTopicPartition) =
committers.getOrElse(
groupTopicPartition,
throw new IllegalStateException(s"Unknown committer, got [$groupTopicPartition] (${committers.keys})"))
private def updatedWithOffset(newOffset: CommittableOffset): CommittableOffsetBatch = {
val partitionOffset = newOffset.partitionOffset
val key = partitionOffset.key
val metadata = newOffset match {
case offset: CommittableOffsetMetadata =>
offset.metadata
case null =>
OffsetFetchResponse.NO_METADATA
}
val newOffsets =
offsetsAndMetadata.updated(key, new OffsetAndMetadata(newOffset.partitionOffset.offset + 1L, metadata))
val newCommitter = newOffset match {
case c: CommittableOffsetImpl => c.committer
case _ =>
throw new IllegalArgumentException(
s"Unknown CommittableOffset, got [${newOffset.getClass.getName}], " +
s"expected [${classOf[CommittableOffsetImpl].getName}]")
}
// the last `KafkaAsyncConsumerCommitterRef` wins (see https://github.com/akka/alpakka-kafka/issues/942)
val newCommitters = committers.updated(key, newCommitter)
new CommittableOffsetBatchImpl(newOffsets, newCommitters, batchSize + 1)
}