in consumer/push_consumer.go [629:910]
func (pc *pushConsumer) pullMessage(request *PullRequest) {
rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
var sleepTime time.Duration
pq := request.pq
go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
rlog.Info("push consumer close pullMessage.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
pc.submitToConsume(request.pq, request.mq)
if request.pq.IsDroppd() {
rlog.Info("push consumer quit pullMessage for dropped queue.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}
})
for {
NEXT:
select {
case <-pc.done:
rlog.Info("push consumer close message handle.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
default:
}
if pq.IsDroppd() {
rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
return
}
if sleepTime > 0 {
rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
rlog.Warning("consumer state error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {
rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if !pc.consumeOrderly {
if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{
"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"maxSpan": pq.getMaxSpan(),
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueMaxSpanFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
} else {
if pq.IsLock() {
if !request.lockedFirst {
offset, err := pc.computePullFromWhereWithException(request.mq)
if err != nil {
rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
brokerBusy := offset < request.nextOffset
rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
rlog.LogKeyValueChangedFrom: request.nextOffset,
rlog.LogKeyValueChangedTo: offset,
"brokerBusy": brokerBusy,
})
if brokerBusy {
rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+
"broker consume offset", map[string]interface{}{"offset": offset})
}
request.lockedFirst = true
request.nextOffset = offset
}
} else {
rlog.Info("pull message later because not locked in broker", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
}
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {
rlog.Info("find the consumer's subscription failed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
beginTime := time.Now()
var (
commitOffsetEnable bool
commitOffsetValue int64
subExpression string
)
if pc.model == Clustering {
commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {
commitOffsetEnable = true
}
}
sd := v.(*internal.SubscriptionData)
classFilter := sd.ClassFilterMode
if pc.option.PostSubscriptionWhenPull && !classFilter {
subExpression = sd.SubString
}
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: subExpression,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
BrokerName: request.mq.BrokerName,
}
//
//if data.ExpType == string(TAG) {
// pullRequest.SubVersion = 0
//} else {
// pullRequest.SubVersion = data.SubVersion
//}
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
if brokerResult == nil {
rlog.Warning("no broker found for mq", map[string]interface{}{
rlog.LogKeyPullRequest: request.mq.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if brokerResult.Slave {
pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
}
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warning("pull message from broker error", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == primitive.PullBrokerTimeout {
rlog.Warning("pull broker timeout", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
pc.processPullResult(request.mq, result, sd)
if result.MaxOffset > pq.maxOffsetInQueue {
pq.maxOffsetInQueue = result.MaxOffset
}
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("the pull request offset illegal", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(*request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
}
}