in async_producer.go [922:1014]
func (bp *brokerProducer) run() {
var output chan<- *produceSet
var timerChan <-chan time.Time
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
for {
select {
case msg, ok := <-bp.input:
if !ok {
Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
bp.shutdown()
return
}
if msg == nil {
continue
}
if msg.flags&syn == syn {
Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
if bp.currentRetries[msg.Topic] == nil {
bp.currentRetries[msg.Topic] = make(map[int32]error)
}
bp.currentRetries[msg.Topic][msg.Partition] = nil
bp.parent.inFlight.Done()
continue
}
if reason := bp.needsRetry(msg); reason != nil {
bp.parent.retryMessage(msg, reason)
if bp.closing == nil && msg.flags&fin == fin {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
}
continue
}
if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
DebugLogger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
continue
}
if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
// The epoch was reset, need to roll the buffer over
Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
if err := bp.waitForSpace(msg, true); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
}
if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency)
timerChan = bp.timer.C
}
case <-timerChan:
bp.timerFired = true
case output <- bp.buffer:
bp.rollOver()
timerChan = nil
case response, ok := <-bp.responses:
if ok {
bp.handleResponse(response)
}
}
if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
}
}
}