func()

in consumer/pull_consumer.go [782:954]


func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
	rlog.Debug("defaultPullConsumer start a new Pull Message task for PullRequest", map[string]interface{}{
		rlog.LogKeyPullRequest: request.String(),
	})
	var sleepTime time.Duration
	pq := request.pq
	go primitive.WithRecover(func() {
		for {
			select {
			case <-pc.done:
				rlog.Info("defaultPullConsumer close pullMessage.", map[string]interface{}{
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				return
			default:
				pc.submitToConsume(request.pq, request.mq)
				if request.pq.IsDroppd() {
					rlog.Info("defaultPullConsumer quit pullMessage for dropped queue.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		}
	})
	for {
	NEXT:
		select {
		case <-pc.done:
			rlog.Info("defaultPullConsumer close message handle.", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
			})
			return
		default:
		}

		if pq.IsDroppd() {
			rlog.Debug("defaultPullConsumer the request was dropped, so stop task", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
			})
			return
		}
		if sleepTime > 0 {
			rlog.Debug(fmt.Sprintf("defaultPullConsumer pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
			time.Sleep(sleepTime)
		}
		// reset time
		sleepTime = pc.option.PullInterval.Load()
		pq.lastPullTime.Store(time.Now())
		err := pc.makeSureStateOK()
		if err != nil {
			rlog.Warning("defaultPullConsumer state error", map[string]interface{}{
				rlog.LogKeyUnderlayError: err.Error(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if pc.pause {
			rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of [%s] was paused, execute pull request [%s] later",
				pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
			sleepTime = _PullDelayTimeWhenSuspend
			goto NEXT
		}

		v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
		if !exist {
			rlog.Info("defaultPullConsumer find the consumer's subscription failed", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
		beginTime := time.Now()
		sd := v.(*internal.SubscriptionData)

		sysFlag := buildSysFlag(false, true, true, false)

		pullRequest := &internal.PullMessageRequestHeader{
			ConsumerGroup:        pc.consumerGroup,
			Topic:                request.mq.Topic,
			QueueId:              int32(request.mq.QueueId),
			QueueOffset:          nextOffset,
			MaxMsgNums:           pc.option.PullBatchSize.Load(),
			SysFlag:              sysFlag,
			CommitOffset:         0,
			SubExpression:        sd.SubString,
			ExpressionType:       string(TAG),
			SuspendTimeoutMillis: 20 * time.Second,
			BrokerName:           request.mq.BrokerName,
		}

		brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
		if brokerResult == nil {
			rlog.Warning("defaultPullConsumer no broker found for mq", map[string]interface{}{
				rlog.LogKeyPullRequest: request.mq.String(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if brokerResult.Slave {
			pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
		}

		rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil)

		result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
		if err != nil {
			rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{
				rlog.LogKeyBroker:        brokerResult.BrokerAddr,
				rlog.LogKeyUnderlayError: err.Error(),
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		if result.Status == primitive.PullBrokerTimeout {
			rlog.Warning("defaultPullConsumer pull broker timeout", map[string]interface{}{
				rlog.LogKeyBroker: brokerResult.BrokerAddr,
			})
			sleepTime = _PullDelayTimeWhenError
			goto NEXT
		}

		pc.processPullResult(request.mq, result, sd)

		switch result.Status {
		case primitive.PullFound:
			rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
			prevRequestOffset := request.nextOffset
			request.nextOffset = result.NextBeginOffset

			rt := time.Now().Sub(beginTime) / time.Millisecond
			pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))

			msgFounded := result.GetMessageExts()
			firstMsgOffset := int64(math.MaxInt64)
			if len(msgFounded) != 0 {
				firstMsgOffset = msgFounded[0].QueueOffset
				pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
				pq.putMessage(msgFounded...)
			}
			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
				rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
					"nextBeginOffset":   result.NextBeginOffset,
					"firstMsgOffset":    firstMsgOffset,
					"prevRequestOffset": prevRequestOffset,
				})
			}
		case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
			request.nextOffset = result.NextBeginOffset
			pc.correctTagsOffset(request)
		case primitive.PullOffsetIllegal:
			rlog.Warning("defaultPullConsumer the pull request offset illegal", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
				"result":               result.String(),
			})
			request.nextOffset = result.NextBeginOffset
			pq.WithDropped(true)
			time.Sleep(10 * time.Second)
			pc.storage.update(request.mq, request.nextOffset, false)
			pc.storage.persist([]*primitive.MessageQueue{request.mq})
			pc.processQueueTable.Delete(*request.mq)
			rlog.Warning(fmt.Sprintf("defaultPullConsumer fix the pull request offset: %s", request.String()), nil)
		default:
			rlog.Warning(fmt.Sprintf("defaultPullConsumer unknown pull status: %v", result.Status), nil)
			sleepTime = _PullDelayTimeWhenError
		}
	}
}