in async_producer.go [389:471]
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
shuttingDown := false
for msg := range p.input {
if msg == nil {
Logger.Println("Something tried to send a nil message, it was ignored.")
continue
}
if msg.flags&endtxn != 0 {
var err error
if msg.flags&committxn != 0 {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
} else {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
}
if err != nil {
Logger.Printf("producer/txnmgr unable to end transaction %s", err)
}
p.inFlight.Done()
continue
}
if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
}
if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
} else {
Logger.Println(pErr)
}
continue
}
p.inFlight.Add(1)
// Ignore retried msg, there are already in txn.
// Can't produce new record when transaction is not started.
if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
p.returnError(msg, ErrTransactionNotReady)
continue
}
}
for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
} else if msg.Headers != nil {
p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
continue
}
size := msg.ByteSize(version)
if size > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
continue
}
handler := handlers[msg.Topic]
if handler == nil {
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
}
handler <- msg
}
for _, handler := range handlers {
close(handler)
}
}