in core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala [142:181]
protected def initialInHandler(): Unit = producingInHandler()
protected def producingInHandler(): Unit = setHandler(stage.in, new DefaultInHandler())
// suspend demand until a Producer has been created
suspendDemandOutHandler()
initialInHandler()
protected def produce(in: Envelope[K, V, P]): Unit =
in match {
case msg: Message[K, V, P] =>
val r = Promise[Result[K, V, P]]()
awaitingConfirmation += 1
producer.send(msg.record, new SendCallback(msg, r))
postSend(msg)
val future = r.future.asInstanceOf[Future[OUT]]
push(stage.out, future)
case multiMsg: MultiMessage[K, V, P] =>
val promises = for {
msg <- multiMsg.records
} yield {
val r = Promise[MultiResultPart[K, V]]()
awaitingConfirmation += 1
producer.send(msg, new SendMultiCallback(msg, r))
r.future
}
postSend(multiMsg)
implicit val ec: ExecutionContext = this.materializer.executionContext
val res = Future.sequence(promises).map { parts =>
MultiResult(parts, multiMsg.passThrough)
}
val future = res.asInstanceOf[Future[OUT]]
push(stage.out, future)
case passthrough: PassThroughMessage[K, V, P] =>
postSend(passthrough)
val future = Future.successful(PassThroughResult[K, V, P](in.passThrough)).asInstanceOf[Future[OUT]]
push(stage.out, future)
}