private def parseFirstMessage()

in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala [189:207]


  private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
    producerAssignmentLifecycle match {
      case Assigned                            => true
      case Unassigned if firstMessage.nonEmpty =>
        // this should never happen because demand should be suspended until the producer is assigned
        throw new IllegalStateException("Cannot reapply first message")
      case Unassigned =>
        // stash the first message so it can be sent after the producer is assigned
        firstMessage = Some(msg)
        // initiate async async producer request _after_ first message is stashed in case future eagerly resolves
        // instead of asynccallback
        resolveProducer(generatedTransactionalConfig(msg))
        // suspend demand after we receive the first message until the producer is assigned
        suspendDemand()
        false
      case AsyncCreateRequestSent =>
        throw new IllegalStateException(
          s"Should never receive new messages while in producer assignment state '$AsyncCreateRequestSent'")
    }