in async_producer.go [785:895]
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
pending = make(chan *brokerProducerResponse)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
// Use a wait group to know if we still have in flight requests
var wg sync.WaitGroup
for set := range bridge {
request := set.buildRequest()
// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
// Forward the response to make sure we do not block the responseReceiver
pending <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
wg.Done()
}
}(set)
if p.IsTransactional() {
// Add partition to tx before sending current batch
err := p.txnmgr.publishTxnPartitions()
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
}
// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
err := broker.AsyncProduce(request, sendResponse)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
}
}
// Wait for all in flight requests to close the pending channel safely
wg.Wait()
close(pending)
})
// In order to avoid a deadlock when closing the broker on network or malformed response error
// we use an intermediate channel to buffer and send pending responses in order
// This is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires such goroutine to be finished
go withRecover(func() {
buf := queue.New()
for {
if buf.Length() == 0 {
res, ok := <-pending
if !ok {
// We are done forwarding the last pending response
close(responses)
return
}
buf.Add(res)
}
// Send the head pending response or buffer another one
// so that we never block the callback
headRes := buf.Peek().(*brokerProducerResponse)
select {
case res, ok := <-pending:
if !ok {
continue
}
buf.Add(res)
continue
case responses <- headRes:
buf.Remove()
continue
}
}
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}