in async_producer.go [629:719]
func (pp *partitionProducer) dispatch() {
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
// on the first message
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
defer func() {
if pp.brokerProducer != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
}
}()
leaderRetries := 0
for msg := range pp.input {
if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
select {
case <-pp.brokerProducer.abandoned:
// a message on the abandoned channel means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
default:
// producer connection is still open.
}
}
if msg.retries > pp.highWatermark {
if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
continue
}
// a new, higher, retry level; handle it and then back off
pp.newHighWatermark(msg.retries)
pp.backoff(msg.retries)
} else if pp.highWatermark > 0 {
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
if msg.retries < pp.highWatermark {
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
if msg.flags&fin == fin {
pp.retryState[msg.retries].expectChaser = false
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
} else {
pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
}
continue
} else if msg.flags&fin == fin {
// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
// meaning this retry level is done and we can go down (at least) one level and flush that
pp.retryState[pp.highWatermark].expectChaser = false
pp.flushRetryBuffers()
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
continue
}
}
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
// without breaking any of our ordering guarantees
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
// Report the error on this message and wait for the backoff, then move on to the next one.
pp.parent.returnError(msg, err)
pp.backoff(leaderRetries)
leaderRetries++
continue
}
// On success, reset the retry count to 0.
leaderRetries = 0
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
// Now that we know we have a broker to actually try and send this message to, generate the sequence
// number for it.
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
if pp.parent.IsTransactional() {
pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
}
pp.brokerProducer.input <- msg
}
}