func()

in pulsar/consumer_partition.go [1357:1466]


func (pc *partitionConsumer) dispatcher() {
	defer func() {
		pc.log.Debug("exiting dispatch loop")
	}()
	var messages []*message
	for {
		var queueCh chan []*message
		var messageCh chan ConsumerMessage
		var nextMessage ConsumerMessage
		var nextMessageSize int

		// are there more messages to send?
		if len(messages) > 0 {
			nextMessage = ConsumerMessage{
				Consumer: pc.parentConsumer,
				Message:  messages[0],
			}
			nextMessageSize = messages[0].size()

			if pc.dlq.shouldSendToDlq(&nextMessage) {
				// pass the message to the DLQ router
				pc.metrics.DlqCounter.Inc()
				messageCh = pc.dlq.Chan()
			} else {
				// pass the message to application channel
				messageCh = pc.messageCh
			}

			pc.metrics.PrefetchedMessages.Dec()
			pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
		} else {
			queueCh = pc.queueCh
		}

		select {
		case <-pc.closeCh:
			return

		case _, ok := <-pc.connectedCh:
			if !ok {
				return
			}
			pc.log.Debug("dispatcher received connection event")

			messages = nil

			// reset available permits
			pc.availablePermits.reset()

			var initialPermits uint32
			if pc.options.autoReceiverQueueSize {
				initialPermits = uint32(pc.currentQueueSize.Load())
			} else {
				initialPermits = uint32(pc.maxQueueSize)
			}

			pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
			// send initial permits
			if err := pc.internalFlow(initialPermits); err != nil {
				pc.log.WithError(err).Error("unable to send initial permits to broker")
			}

		case msgs, ok := <-queueCh:
			if !ok {
				return
			}
			// we only read messages here after the consumer has processed all messages
			// in the previous batch
			messages = msgs

		// if the messageCh is nil or the messageCh is full this will not be selected
		case messageCh <- nextMessage:
			// allow this message to be garbage collected
			messages[0] = nil
			messages = messages[1:]

			pc.availablePermits.inc()

			if pc.options.autoReceiverQueueSize {
				pc.incomingMessages.Dec()
				pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
				pc.expectMoreIncomingMessages()
			}

		case clearQueueCb := <-pc.clearQueueCh:
			// drain the message queue on any new connection by sending a
			// special nil message to the channel so we know when to stop dropping messages
			var nextMessageInQueue *trackingMessageID
			go func() {
				pc.queueCh <- nil
			}()

			for m := range pc.queueCh {
				// the queue has been drained
				if m == nil {
					break
				} else if nextMessageInQueue == nil {
					nextMessageInQueue = toTrackingMessageID(m[0].msgID)
				}
				if pc.options.autoReceiverQueueSize {
					pc.incomingMessages.Sub(int32(len(m)))
				}
			}

			messages = nil

			clearQueueCb(nextMessageInQueue)
		}
	}
}