func()

in consumer/push_consumer.go [629:910]


func (pc *pushConsumer) pullMessage(request *PullRequest) {
	rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{
		rlog.LogKeyPullRequest: request.String(),
	})
	var sleepTime time.Duration
	pq := request.pq
	go primitive.WithRecover(func() {
		for {
			select {
			case <-pc.done:
				rlog.Info("push consumer close pullMessage.", map[string]interface{}{
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				return
			default:
				pc.submitToConsume(request.pq, request.mq)
				if request.pq.IsDroppd() {
					rlog.Info("push consumer quit pullMessage for dropped queue.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		}
	})

	for {
	NEXT:
		select {
		case <-pc.done:
			rlog.Info("push consumer close message handle.", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
			})
			return
		default:
		}

		if pq.IsDroppd() {
			rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
			})
			return
		}
		if sleepTime > 0 {
			rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
			time.Sleep(sleepTime)
		}
		// reset time
		sleepTime = pc.option.PullInterval.Load()
		pq.lastPullTime.Store(time.Now())
		err := pc.makeSureStateOK()
		if err != nil {
			rlog.Warning("consumer state error", map[string]interface{}{
				rlog.LogKeyUnderlayError: err.Error(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if pc.pause {
			rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
				pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
			sleepTime = _PullDelayTimeWhenSuspend
			goto NEXT
		}

		cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
		if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() {
			if pc.queueFlowControlTimes%1000 == 0 {
				rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
					"PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(),
					"minOffset":             pq.Min(),
					"maxOffset":             pq.Max(),
					"count":                 pq.cachedMsgCount,
					"size(MiB)":             cachedMessageSizeInMiB,
					"flowControlTimes":      pc.queueFlowControlTimes,
					rlog.LogKeyPullRequest:  request.String(),
				})
			}
			pc.queueFlowControlTimes++
			sleepTime = _PullDelayTimeWhenFlowControl
			goto NEXT
		}

		if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) {
			if pc.queueFlowControlTimes%1000 == 0 {
				rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
					"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(),
					"minOffset":                 pq.Min(),
					"maxOffset":                 pq.Max(),
					"count":                     pq.cachedMsgCount,
					"size(MiB)":                 cachedMessageSizeInMiB,
					"flowControlTimes":          pc.queueFlowControlTimes,
					rlog.LogKeyPullRequest:      request.String(),
				})
			}
			pc.queueFlowControlTimes++
			sleepTime = _PullDelayTimeWhenFlowControl
			goto NEXT
		}

		if !pc.consumeOrderly {
			if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
				if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
					rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{
						"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,
						"minOffset":                  pq.Min(),
						"maxOffset":                  pq.Max(),
						"maxSpan":                    pq.getMaxSpan(),
						"flowControlTimes":           pc.queueFlowControlTimes,
						rlog.LogKeyPullRequest:       request.String(),
					})
				}
				pc.queueMaxSpanFlowControlTimes++
				sleepTime = _PullDelayTimeWhenFlowControl
				goto NEXT
			}
		} else {
			if pq.IsLock() {
				if !request.lockedFirst {
					offset, err := pc.computePullFromWhereWithException(request.mq)
					if err != nil {
						rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
							rlog.LogKeyUnderlayError: err.Error(),
						})
						sleepTime = _PullDelayTimeWhenError
						goto NEXT
					}

					brokerBusy := offset < request.nextOffset
					rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
						rlog.LogKeyPullRequest:      request.String(),
						rlog.LogKeyValueChangedFrom: request.nextOffset,
						rlog.LogKeyValueChangedTo:   offset,
						"brokerBusy":                brokerBusy,
					})
					if brokerBusy {
						rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+
							"broker consume offset", map[string]interface{}{"offset": offset})
					}
					request.lockedFirst = true
					request.nextOffset = offset
				}
			} else {
				rlog.Info("pull message later because not locked in broker", map[string]interface{}{
					rlog.LogKeyPullRequest: request.String(),
				})
				sleepTime = _PullDelayTimeWhenError
				goto NEXT
			}
		}

		v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
		if !exist {
			rlog.Info("find the consumer's subscription failed", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}
		beginTime := time.Now()
		var (
			commitOffsetEnable bool
			commitOffsetValue  int64
			subExpression      string
		)

		if pc.model == Clustering {
			commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory)
			if commitOffsetValue > 0 {
				commitOffsetEnable = true
			}
		}

		sd := v.(*internal.SubscriptionData)
		classFilter := sd.ClassFilterMode
		if pc.option.PostSubscriptionWhenPull && !classFilter {
			subExpression = sd.SubString
		}

		sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)

		pullRequest := &internal.PullMessageRequestHeader{
			ConsumerGroup:        pc.consumerGroup,
			Topic:                request.mq.Topic,
			QueueId:              int32(request.mq.QueueId),
			QueueOffset:          request.nextOffset,
			MaxMsgNums:           pc.option.PullBatchSize.Load(),
			SysFlag:              sysFlag,
			CommitOffset:         commitOffsetValue,
			SubExpression:        subExpression,
			ExpressionType:       string(TAG),
			SuspendTimeoutMillis: 20 * time.Second,
			BrokerName:           request.mq.BrokerName,
		}
		//
		//if data.ExpType == string(TAG) {
		//	pullRequest.SubVersion = 0
		//} else {
		//	pullRequest.SubVersion = data.SubVersion
		//}

		brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
		if brokerResult == nil {
			rlog.Warning("no broker found for mq", map[string]interface{}{
				rlog.LogKeyPullRequest: request.mq.String(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if brokerResult.Slave {
			pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
		}

		result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
		if err != nil {
			rlog.Warning("pull message from broker error", map[string]interface{}{
				rlog.LogKeyBroker:        brokerResult.BrokerAddr,
				rlog.LogKeyUnderlayError: err.Error(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if result.Status == primitive.PullBrokerTimeout {
			rlog.Warning("pull broker timeout", map[string]interface{}{
				rlog.LogKeyBroker: brokerResult.BrokerAddr,
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		pc.processPullResult(request.mq, result, sd)
		if result.MaxOffset > pq.maxOffsetInQueue {
			pq.maxOffsetInQueue = result.MaxOffset
		}

		switch result.Status {
		case primitive.PullFound:
			rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
			prevRequestOffset := request.nextOffset
			request.nextOffset = result.NextBeginOffset

			rt := time.Now().Sub(beginTime) / time.Millisecond
			pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))

			msgFounded := result.GetMessageExts()
			firstMsgOffset := int64(math.MaxInt64)
			if len(msgFounded) != 0 {
				firstMsgOffset = msgFounded[0].QueueOffset
				pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
				pq.putMessage(msgFounded...)
			}
			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
				rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
					"nextBeginOffset":   result.NextBeginOffset,
					"firstMsgOffset":    firstMsgOffset,
					"prevRequestOffset": prevRequestOffset,
				})
			}
		case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
			request.nextOffset = result.NextBeginOffset
			pc.correctTagsOffset(request)
		case primitive.PullOffsetIllegal:
			rlog.Warning("the pull request offset illegal", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
				"result":               result.String(),
			})
			request.nextOffset = result.NextBeginOffset
			pq.WithDropped(true)
			time.Sleep(10 * time.Second)
			pc.storage.update(request.mq, request.nextOffset, false)
			pc.storage.persist([]*primitive.MessageQueue{request.mq})
			pc.processQueueTable.Delete(*request.mq)
			rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
		default:
			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
			sleepTime = _PullDelayTimeWhenError
		}
	}
}