protected def initialInHandler()

in core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala [142:181]


  protected def initialInHandler(): Unit = producingInHandler()
  protected def producingInHandler(): Unit = setHandler(stage.in, new DefaultInHandler())

  // suspend demand until a Producer has been created
  suspendDemandOutHandler()
  initialInHandler()

  protected def produce(in: Envelope[K, V, P]): Unit =
    in match {
      case msg: Message[K, V, P] =>
        val r = Promise[Result[K, V, P]]()
        awaitingConfirmation += 1
        producer.send(msg.record, new SendCallback(msg, r))
        postSend(msg)
        val future = r.future.asInstanceOf[Future[OUT]]
        push(stage.out, future)

      case multiMsg: MultiMessage[K, V, P] =>
        val promises = for {
          msg <- multiMsg.records
        } yield {
          val r = Promise[MultiResultPart[K, V]]()
          awaitingConfirmation += 1
          producer.send(msg, new SendMultiCallback(msg, r))
          r.future
        }
        postSend(multiMsg)
        implicit val ec: ExecutionContext = this.materializer.executionContext
        val res = Future.sequence(promises).map { parts =>
          MultiResult(parts, multiMsg.passThrough)
        }
        val future = res.asInstanceOf[Future[OUT]]
        push(stage.out, future)

      case passthrough: PassThroughMessage[K, V, P] =>
        postSend(passthrough)
        val future = Future.successful(PassThroughResult[K, V, P](in.passThrough)).asInstanceOf[Future[OUT]]
        push(stage.out, future)

    }