private def produce()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala [106:129]


  private def produce(in: Envelope[K, V, Committable]): Unit =
    in match {
      case msg: Message[K, V, Committable] =>
        awaitingProduceResult += 1
        awaitingCommitResult += 1
        producer.send(msg.record, new SendCallback(msg.passThrough))

      case multiMessage: MultiMessage[K, V, Committable] if multiMessage.records.isEmpty =>
        awaitingCommitResult += 1
        collectOffset(multiMessage.passThrough)

      case multiMsg: MultiMessage[K, V, Committable] =>
        val size = multiMsg.records.size
        awaitingProduceResult += size
        awaitingCommitResult += 1
        val cb = new SendMultiCallback(size, multiMsg.passThrough)
        for {
          record <- multiMsg.records
        } producer.send(record, cb)

      case msg: PassThroughMessage[K, V, Committable] =>
        awaitingCommitResult += 1
        collectOffset(msg.passThrough)
    }