in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala [189:207]
private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
producerAssignmentLifecycle match {
case Assigned => true
case Unassigned if firstMessage.nonEmpty =>
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
case Unassigned =>
// stash the first message so it can be sent after the producer is assigned
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
// instead of asynccallback
resolveProducer(generatedTransactionalConfig(msg))
// suspend demand after we receive the first message until the producer is assigned
suspendDemand()
false
case AsyncCreateRequestSent =>
throw new IllegalStateException(
s"Should never receive new messages while in producer assignment state '$AsyncCreateRequestSent'")
}