in core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala [106:129]
private def produce(in: Envelope[K, V, Committable]): Unit =
in match {
case msg: Message[K, V, Committable] =>
awaitingProduceResult += 1
awaitingCommitResult += 1
producer.send(msg.record, new SendCallback(msg.passThrough))
case multiMessage: MultiMessage[K, V, Committable] if multiMessage.records.isEmpty =>
awaitingCommitResult += 1
collectOffset(multiMessage.passThrough)
case multiMsg: MultiMessage[K, V, Committable] =>
val size = multiMsg.records.size
awaitingProduceResult += size
awaitingCommitResult += 1
val cb = new SendMultiCallback(size, multiMsg.passThrough)
for {
record <- multiMsg.records
} producer.send(record, cb)
case msg: PassThroughMessage[K, V, Committable] =>
awaitingCommitResult += 1
collectOffset(msg.passThrough)
}