in pulsar/consumer_partition.go [1357:1466]
func (pc *partitionConsumer) dispatcher() {
defer func() {
pc.log.Debug("exiting dispatch loop")
}()
var messages []*message
for {
var queueCh chan []*message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int
// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}
nextMessageSize = messages[0].size()
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}
pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
queueCh = pc.queueCh
}
select {
case <-pc.closeCh:
return
case _, ok := <-pc.connectedCh:
if !ok {
return
}
pc.log.Debug("dispatcher received connection event")
messages = nil
// reset available permits
pc.availablePermits.reset()
var initialPermits uint32
if pc.options.autoReceiverQueueSize {
initialPermits = uint32(pc.currentQueueSize.Load())
} else {
initialPermits = uint32(pc.maxQueueSize)
}
pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
if err := pc.internalFlow(initialPermits); err != nil {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}
case msgs, ok := <-queueCh:
if !ok {
return
}
// we only read messages here after the consumer has processed all messages
// in the previous batch
messages = msgs
// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
pc.availablePermits.inc()
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
pc.expectMoreIncomingMessages()
}
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue *trackingMessageID
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Sub(int32(len(m)))
}
}
messages = nil
clearQueueCb(nextMessageInQueue)
}
}
}