private[internal] def committerFor()

in core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala [178:207]


  private[internal] def committerFor(groupTopicPartition: GroupTopicPartition) =
    committers.getOrElse(
      groupTopicPartition,
      throw new IllegalStateException(s"Unknown committer, got [$groupTopicPartition] (${committers.keys})"))

  private def updatedWithOffset(newOffset: CommittableOffset): CommittableOffsetBatch = {
    val partitionOffset = newOffset.partitionOffset
    val key = partitionOffset.key
    val metadata = newOffset match {
      case offset: CommittableOffsetMetadata =>
        offset.metadata
      case null =>
        OffsetFetchResponse.NO_METADATA
    }

    val newOffsets =
      offsetsAndMetadata.updated(key, new OffsetAndMetadata(newOffset.partitionOffset.offset + 1L, metadata))

    val newCommitter = newOffset match {
      case c: CommittableOffsetImpl => c.committer
      case _ =>
        throw new IllegalArgumentException(
          s"Unknown CommittableOffset, got [${newOffset.getClass.getName}], " +
          s"expected [${classOf[CommittableOffsetImpl].getName}]")
    }

    // the last `KafkaAsyncConsumerCommitterRef` wins (see https://github.com/akka/alpakka-kafka/issues/942)
    val newCommitters = committers.updated(key, newCommitter)
    new CommittableOffsetBatchImpl(newOffsets, newCommitters, batchSize + 1)
  }