consumer/push_consumer.go (1,215 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package consumer import ( "context" "fmt" "math" "runtime/pprof" "strconv" "strings" "sync" "sync/atomic" "time" errors2 "github.com/apache/rocketmq-client-go/v2/errors" "github.com/pkg/errors" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" ) // In most scenarios, this is the mostly recommended usage to consume messages. // // Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on // arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages. // // See quick start/Consumer in the example module for a typical usage. // // <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe. const ( Mb = 1024 * 1024 ) type PushConsumerCallback struct { topic string f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error) } func (callback PushConsumerCallback) UniqueID() string { return callback.topic } type pushConsumer struct { *defaultConsumer queueFlowControlTimes int queueMaxSpanFlowControlTimes int consumeFunc utils.Set submitToConsume func(*processQueue, *primitive.MessageQueue) subscribedTopic map[string]string interceptor primitive.Interceptor queueLock *QueueLock done chan struct{} closeOnce sync.Once crCh map[string]chan struct{} } func NewPushConsumer(opts ...Option) (*pushConsumer, error) { defaultOpts := defaultPushConsumerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs if defaultOpts.Namespace != "" { defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName } dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, consumeOrderly: defaultOpts.ConsumeOrderly, fromWhere: defaultOpts.FromWhere, allocate: defaultOpts.Strategy, option: defaultOpts, } if dc.client == nil { return nil, fmt.Errorf("GetOrNewRocketMQClient faild") } defaultOpts.Namesrv = dc.client.GetNameSrv() p := &pushConsumer{ defaultConsumer: dc, subscribedTopic: make(map[string]string, 0), queueLock: newQueueLock(), done: make(chan struct{}, 1), consumeFunc: utils.NewSet(), crCh: make(map[string]chan struct{}), } dc.mqChanged = p.messageQueueChanged if p.consumeOrderly { p.submitToConsume = p.consumeMessageOrderly } else { p.submitToConsume = p.consumeMessageConcurrently } p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...) return p, nil } func (pc *pushConsumer) Start() error { var err error pc.once.Do(func() { rlog.Info("the consumer start beginning", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, "messageModel": pc.model, "unitMode": pc.unitMode, }) atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed)) err = pc.validate() if err != nil { rlog.Error("the consumer group option validate fail", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, rlog.LogKeyUnderlayError: err.Error(), }) err = errors.Wrap(err, "the consumer group option validate fail") return } err = pc.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) err = errors2.ErrCreated return } err = pc.defaultConsumer.start() if err != nil { return } retryTopic := internal.GetRetryTopic(pc.consumerGroup) pc.crCh[retryTopic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) go func() { // todo start clean msg expired for { select { case pr := <-pc.prCh: go func() { pc.pullMessage(&pr) }() case <-pc.done: rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }() go primitive.WithRecover(func() { if pc.consumeOrderly { return } time.Sleep(pc.option.ConsumeTimeout) pc.cleanExpiredMsg() ticker := time.NewTicker(pc.option.ConsumeTimeout) defer ticker.Stop() for { select { case <-ticker.C: pc.cleanExpiredMsg() case <-pc.done: rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }) go primitive.WithRecover(func() { // initial lock. if !pc.consumeOrderly { return } time.Sleep(1000 * time.Millisecond) pc.lockAll() lockTicker := time.NewTicker(pc.option.RebalanceLockInterval) defer lockTicker.Stop() for { select { case <-lockTicker.C: pc.lockAll() case <-pc.done: rlog.Info("push consumer close tick.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }) }) if err != nil { return err } pc.client.UpdateTopicRouteInfo() for k := range pc.subscribedTopic { _, exist := pc.topicSubscribeInfoTable.Load(k) if !exist { pc.Shutdown() return fmt.Errorf("the topic=%s route info not found, it may not exist", k) } } pc.client.CheckClientInBroker() pc.client.SendHeartbeatToAllBrokerWithLock() go pc.client.RebalanceImmediately() return err } func (pc *pushConsumer) GetOffsetDiffMap() map[string]int64 { offsetDiffMap := make(map[string]int64) pc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) topic := mq.Topic consumerOffset, _ := pc.storage.readWithException(&mq, _ReadFromMemory) maxOffset := pq.maxOffsetInQueue if consumerOffset < 0 || maxOffset < 0 || consumerOffset > maxOffset { return true } if _, ok := offsetDiffMap[topic]; !ok { offsetDiffMap[topic] = 0 } offsetDiff := offsetDiffMap[topic] offsetDiffMap[topic] = offsetDiff + (maxOffset - consumerOffset) return true }) return offsetDiffMap } func (pc *pushConsumer) Shutdown() error { var err error pc.closeOnce.Do(func() { if pc.option.TraceDispatcher != nil { pc.option.TraceDispatcher.Close() } close(pc.done) if pc.consumeOrderly && pc.model == Clustering { pc.unlockAll(false) } pc.client.UnregisterConsumer(pc.consumerGroup) err = pc.defaultConsumer.shutdown() }) return err } func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error { if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) || atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) { return errors2.ErrStartTopic } if pc.option.Namespace != "" { topic = pc.option.Namespace + "%" + topic } if _, ok := pc.crCh[topic]; !ok { pc.crCh[topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } data := buildSubscriptionData(topic, selector) pc.subscriptionDataTable.Store(topic, data) pc.subscribedTopic[topic] = "" pc.consumeFunc.Add(&PushConsumerCallback{ f: f, topic: topic, }) return nil } func (pc *pushConsumer) Unsubscribe(topic string) error { if pc.option.Namespace != "" { topic = pc.option.Namespace + "%" + topic } pc.subscriptionDataTable.Delete(topic) return nil } func (pc *pushConsumer) Suspend() { pc.suspend() } func (pc *pushConsumer) Resume() { pc.resume() } func (pc *pushConsumer) Rebalance() { pc.defaultConsumer.doBalance() } func (pc *pushConsumer) RebalanceIfNotPaused() { pc.defaultConsumer.doBalanceIfNotPaused() } func (pc *pushConsumer) PersistConsumerOffset() error { return pc.defaultConsumer.persistConsumerOffset() } func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) { pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs) } func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool { return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic) } func (pc *pushConsumer) SubscriptionDataList() []*internal.SubscriptionData { return pc.defaultConsumer.SubscriptionDataList() } func (pc *pushConsumer) IsUnitMode() bool { return pc.unitMode } func (pc *pushConsumer) GetcType() string { return string(pc.cType) } func (pc *pushConsumer) GetModel() string { return pc.model.String() } func (pc *pushConsumer) GetWhere() string { switch pc.fromWhere { case ConsumeFromLastOffset: return "CONSUME_FROM_LAST_OFFSET" case ConsumeFromFirstOffset: return "CONSUME_FROM_FIRST_OFFSET" case ConsumeFromTimestamp: return "CONSUME_FROM_TIMESTAMP" default: return "UNKNOWN" } } func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult { var msgs = []*primitive.MessageExt{msg} var mq = &primitive.MessageQueue{ Topic: msg.Topic, BrokerName: brokerName, QueueId: msg.Queue.QueueId, } beginTime := time.Now() pc.resetRetryAndNamespace(msgs) var result ConsumeResult var err error msgCtx := &primitive.ConsumeMessageContext{ Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, Msgs: msgs, } ctx := context.Background() ctx = primitive.WithConsumerCtx(ctx, msgCtx) ctx = primitive.WithMethod(ctx, primitive.ConsumerPush) concurrentCtx := primitive.NewConsumeConcurrentlyContext() concurrentCtx.MQ = *mq ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx) result, err = pc.consumeInner(ctx, msgs) consumeRT := time.Now().Sub(beginTime) res := &internal.ConsumeMessageDirectlyResult{ Order: false, AutoCommit: true, SpentTimeMills: int64(consumeRT / time.Millisecond), } if err != nil { msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn) res.ConsumeResult = internal.ThrowException res.Remark = err.Error() } else if result == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) res.ConsumeResult = internal.ConsumeSuccess } else if result == ConsumeRetryLater { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) res.ConsumeResult = internal.ConsumeRetryLater } pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond)) return res } func (pc *pushConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus { consumerStatus := internal.NewConsumerStatus() mqOffsetMap := pc.storage.getMQOffsetMap(topic) if mqOffsetMap != nil { consumerStatus.MQOffsetMap = mqOffsetMap } return consumerStatus } func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() pc.subscriptionDataTable.Range(func(key, value interface{}) bool { topic := key.(string) info.SubscriptionData[value.(*internal.SubscriptionData)] = true status := internal.ConsumeStatus{ PullRT: pc.stat.getPullRT(pc.consumerGroup, topic).avgpt, PullTPS: pc.stat.getPullTPS(pc.consumerGroup, topic).tps, ConsumeRT: pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt, ConsumeOKTPS: pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps, ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps, ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum, } info.StatusTable[topic] = status return true }) pc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) pInfo := pq.currentInfo() pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore) info.MQTable[mq] = pInfo return true }) if stack { var buffer strings.Builder err := pprof.Lookup("goroutine").WriteTo(&buffer, 2) if err != nil { rlog.Error("error when get stack ", map[string]interface{}{ "error": err, }) } else { info.JStack = buffer.String() } } nsAddr := "" for _, value := range pc.client.GetNameSrv().AddrList() { nsAddr += fmt.Sprintf("%s;", value) } info.Properties[internal.PropNameServerAddr] = nsAddr info.Properties[internal.PropConsumeType] = string(pc.cType) info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly) info.Properties[internal.PropThreadPoolCoreSize] = "-1" info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10) return info } func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) { v, exit := pc.subscriptionDataTable.Load(topic) if !exit { return } data := v.(*internal.SubscriptionData) newVersion := time.Now().UnixNano() rlog.Info("the MessageQueue changed, version also updated", map[string]interface{}{ rlog.LogKeyValueChangedFrom: data.SubVersion, rlog.LogKeyValueChangedTo: newVersion, }) data.Lock() data.SubVersion = newVersion data.Unlock() // TODO: optimize count := 0 pc.processQueueTable.Range(func(key, value interface{}) bool { count++ return true }) if count > 0 { if pc.option.PullThresholdForTopic != -1 { newVal := pc.option.PullThresholdForTopic / count if newVal == 0 { newVal = 1 } rlog.Info("The PullThresholdForQueue is changed", map[string]interface{}{ rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForQueue.Load(), rlog.LogKeyValueChangedTo: newVal, }) pc.option.PullThresholdForQueue.Store(int64(newVal)) } if pc.option.PullThresholdSizeForTopic != -1 { newVal := pc.option.PullThresholdSizeForTopic / count if newVal == 0 { newVal = 1 } rlog.Info("The PullThresholdSizeForQueue is changed", map[string]interface{}{ rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForQueue.Load(), rlog.LogKeyValueChangedTo: newVal, }) pc.option.PullThresholdSizeForQueue.Store(int32(newVal)) } } pc.client.SendHeartbeatToAllBrokerWithLock() } func (pc *pushConsumer) validate() error { if err := internal.ValidateGroup(pc.consumerGroup); err != nil { return err } if pc.consumerGroup == internal.DefaultConsumerGroup { // TODO FQA return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup) } if len(pc.subscribedTopic) == 0 { rlog.Warning("not subscribe any topic yet", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 { if pc.option.ConsumeConcurrentlyMaxSpan == 0 { pc.option.ConsumeConcurrentlyMaxSpan = 1000 } else { return errors.New("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]") } } if pc.option.PullThresholdForQueue.Load() < 1 || pc.option.PullThresholdForQueue.Load() > 65535 { if pc.option.PullThresholdForQueue.Load() == 0 { pc.option.PullThresholdForQueue.Store(1024) } else { return errors.New("option.PullThresholdForQueue out of range [1, 65535]") } } if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 { if pc.option.PullThresholdForTopic == 0 { pc.option.PullThresholdForTopic = 102400 } else { return errors.New("option.PullThresholdForTopic out of range [1, 6553500]") } } if pc.option.PullThresholdSizeForQueue.Load() < 1 || pc.option.PullThresholdSizeForQueue.Load() > 1024 { if pc.option.PullThresholdSizeForQueue.Load() == 0 { pc.option.PullThresholdSizeForQueue.Store(512) } else { return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]") } } if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 { if pc.option.PullThresholdSizeForTopic == 0 { pc.option.PullThresholdSizeForTopic = 51200 } else { return errors.New("option.PullThresholdSizeForTopic out of range [1, 102400]") } } if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond { return errors.New("option.PullInterval out of range [0, 65535]") } if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 { if pc.option.ConsumeMessageBatchMaxSize == 0 { pc.option.ConsumeMessageBatchMaxSize = 1 } else { return errors.New("option.ConsumeMessageBatchMaxSize out of range [1, 1024]") } } if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 { if pullBatchSize == 0 { pc.option.PullBatchSize.Store(32) } else { return errors.New("option.PullBatchSize out of range [1, 1024]") } } if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 100000 { if pc.option.ConsumeGoroutineNums == 0 { pc.option.ConsumeGoroutineNums = 20 } else { return errors.New("option.ConsumeGoroutineNums out of range [1, 100000]") } } return nil } func (pc *pushConsumer) pullMessage(request *PullRequest) { rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) var sleepTime time.Duration pq := request.pq go primitive.WithRecover(func() { for { select { case <-pc.done: rlog.Info("push consumer close pullMessage.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: pc.submitToConsume(request.pq, request.mq) if request.pq.IsDroppd() { rlog.Info("push consumer quit pullMessage for dropped queue.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } } }) for { NEXT: select { case <-pc.done: rlog.Info("push consumer close message handle.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: } if pq.IsDroppd() { rlog.Debug("the request was dropped, so stop task", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) return } if sleepTime > 0 { rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil) time.Sleep(sleepTime) } // reset time sleepTime = pc.option.PullInterval.Load() pq.lastPullTime.Store(time.Now()) err := pc.makeSureStateOK() if err != nil { rlog.Warning("consumer state error", map[string]interface{}{ rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if pc.pause { rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later", pc.option.InstanceName, pc.consumerGroup, request.String()), nil) sleepTime = _PullDelayTimeWhenSuspend goto NEXT } cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb) if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(), "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.cachedMsgCount, "size(MiB)": cachedMessageSizeInMiB, "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } pc.queueFlowControlTimes++ sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(), "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.cachedMsgCount, "size(MiB)": cachedMessageSizeInMiB, "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } pc.queueFlowControlTimes++ sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } if !pc.consumeOrderly { if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan { if pc.queueMaxSpanFlowControlTimes%1000 == 0 { rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{ "ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan, "minOffset": pq.Min(), "maxOffset": pq.Max(), "maxSpan": pq.getMaxSpan(), "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } pc.queueMaxSpanFlowControlTimes++ sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } } else { if pq.IsLock() { if !request.lockedFirst { offset, err := pc.computePullFromWhereWithException(request.mq) if err != nil { rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{ rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } brokerBusy := offset < request.nextOffset rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), rlog.LogKeyValueChangedFrom: request.nextOffset, rlog.LogKeyValueChangedTo: offset, "brokerBusy": brokerBusy, }) if brokerBusy { rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+ "broker consume offset", map[string]interface{}{"offset": offset}) } request.lockedFirst = true request.nextOffset = offset } } else { rlog.Info("pull message later because not locked in broker", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } } v, exist := pc.subscriptionDataTable.Load(request.mq.Topic) if !exist { rlog.Info("find the consumer's subscription failed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } beginTime := time.Now() var ( commitOffsetEnable bool commitOffsetValue int64 subExpression string ) if pc.model == Clustering { commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory) if commitOffsetValue > 0 { commitOffsetEnable = true } } sd := v.(*internal.SubscriptionData) classFilter := sd.ClassFilterMode if pc.option.PostSubscriptionWhenPull && !classFilter { subExpression = sd.SubString } sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter) pullRequest := &internal.PullMessageRequestHeader{ ConsumerGroup: pc.consumerGroup, Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), QueueOffset: request.nextOffset, MaxMsgNums: pc.option.PullBatchSize.Load(), SysFlag: sysFlag, CommitOffset: commitOffsetValue, SubExpression: subExpression, ExpressionType: string(TAG), SuspendTimeoutMillis: 20 * time.Second, BrokerName: request.mq.BrokerName, } // //if data.ExpType == string(TAG) { // pullRequest.SubVersion = 0 //} else { // pullRequest.SubVersion = data.SubVersion //} brokerResult := pc.defaultConsumer.tryFindBroker(request.mq) if brokerResult == nil { rlog.Warning("no broker found for mq", map[string]interface{}{ rlog.LogKeyPullRequest: request.mq.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if brokerResult.Slave { pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag) } result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) if err != nil { rlog.Warning("pull message from broker error", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if result.Status == primitive.PullBrokerTimeout { rlog.Warning("pull broker timeout", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, }) sleepTime = _PullDelayTimeWhenError goto NEXT } pc.processPullResult(request.mq, result, sd) if result.MaxOffset > pq.maxOffsetInQueue { pq.maxOffsetInQueue = result.MaxOffset } switch result.Status { case primitive.PullFound: rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil) prevRequestOffset := request.nextOffset request.nextOffset = result.NextBeginOffset rt := time.Now().Sub(beginTime) / time.Millisecond pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt)) msgFounded := result.GetMessageExts() firstMsgOffset := int64(math.MaxInt64) if len(msgFounded) != 0 { firstMsgOffset = msgFounded[0].QueueOffset pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded)) pq.putMessage(msgFounded...) } if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset { rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{ "nextBeginOffset": result.NextBeginOffset, "firstMsgOffset": firstMsgOffset, "prevRequestOffset": prevRequestOffset, }) } case primitive.PullNoNewMsg, primitive.PullNoMsgMatched: request.nextOffset = result.NextBeginOffset pc.correctTagsOffset(request) case primitive.PullOffsetIllegal: rlog.Warning("the pull request offset illegal", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), "result": result.String(), }) request.nextOffset = result.NextBeginOffset pq.WithDropped(true) time.Sleep(10 * time.Second) pc.storage.update(request.mq, request.nextOffset, false) pc.storage.persist([]*primitive.MessageQueue{request.mq}) pc.processQueueTable.Delete(*request.mq) rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil) default: rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil) sleepTime = _PullDelayTimeWhenError } } } func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) { if pr.pq.cachedMsgCount.Load() <= 0 { pc.storage.update(pr.mq, pr.nextOffset, true) } } func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool { var brokerAddr string if len(brokerName) != 0 { brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName) } else { brokerAddr = msg.StoreHost } resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) if err != nil || resp.Code != internal.ResSuccess { // send back as a normal message return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes()) } return true } func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand { req := &internal.ConsumerSendMsgBackRequestHeader{ Group: pc.consumerGroup, OriginTopic: msg.Topic, Offset: msg.CommitLogOffset, DelayLevel: delayLevel, OriginMsgId: msg.MsgId, MaxReconsumeTimes: pc.getMaxReconsumeTimes(), BrokerName: msg.Queue.BrokerName, } return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil) } func (pc *pushConsumer) suspend() { pc.pause = true rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil) } func (pc *pushConsumer) resume() { pc.pause = false pc.doBalance() rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil) } func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) { //topic := cmd.ExtFields["topic"] //group := cmd.ExtFields["group"] //if topic == "" || group == "" { // rlog.Warning("received reset offset command from: %s, but missing params.", from) // return //} //t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64) //if err != nil { // rlog.Warning("received reset offset command from: %s, but parse time error: %s", err.Error()) // return //} //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v", // from, topic, group, t) // //offsetTable := make(map[MessageQueue]int64, 0) //err = json.Unmarshal(cmd.Body, &offsetTable) //if err != nil { // rlog.Warning("received reset offset command from: %s, but parse offset table: %s", err.Error()) // return //} //v, exist := c.consumerMap.Load(group) //if !exist { // rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group) // return //} pc.suspend() defer pc.resume() mqs := make([]*primitive.MessageQueue, 0) copyPc := sync.Map{} pc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) if _, ok := table[mq]; ok && mq.Topic == topic { pq.WithDropped(true) pq.clear() } mqs = append(mqs, &mq) copyPc.Store(&mq, pq) return true }) time.Sleep(10 * time.Second) for _, mq := range mqs { if _, ok := table[*mq]; ok { pc.storage.update(mq, table[*mq], false) v, exist := copyPc.Load(mq) if !exist { continue } pq := v.(*processQueue) pc.removeUnnecessaryMessageQueue(mq, pq) pc.processQueueTable.Delete(*mq) } } } func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool { pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq) if !pc.consumeOrderly || Clustering != pc.model { return true } // TODO orderly return true } func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) { if len(subMsgs) == 0 { return ConsumeRetryLater, errors.New("msg list empty") } f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic) // fix lost retry message if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) { f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic)) } if !exist { return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic) } callback, ok := f.(*PushConsumerCallback) if !ok { return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic) } if pc.interceptor == nil { return callback.f(ctx, subMsgs...) } else { var container ConsumeResultHolder err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error { msgs := req.([]*primitive.MessageExt) r, e := callback.f(ctx, msgs...) realReply := reply.(*ConsumeResultHolder) realReply.ConsumeResult = r msgCtx, _ := primitive.GetConsumerCtx(ctx) msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess if realReply.ConsumeResult == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) } else { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) } return e }) return container.ConsumeResult, err } } // resetRetryAndNamespace modify retry message. func (pc *pushConsumer) resetRetryAndNamespace(subMsgs []*primitive.MessageExt) { groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup beginTime := time.Now() for idx := range subMsgs { msg := subMsgs[idx] retryTopic := msg.GetProperty(primitive.PropertyRetryTopic) if retryTopic != "" && groupTopic == msg.Topic { msg.Topic = retryTopic } subMsgs[idx].WithProperty(primitive.PropertyConsumeStartTime, strconv.FormatInt( beginTime.UnixNano()/int64(time.Millisecond), 10)) } } func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) { msgs := pq.getMessages() if msgs == nil { return } limiter := pc.option.Limiter limiterOn := limiter != nil if _, ok := pc.crCh[mq.Topic]; !ok { pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } for count := 0; count < len(msgs); count++ { var subMsgs []*primitive.MessageExt if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) { subMsgs = msgs[count:] count = len(msgs) } else { next := count + pc.option.ConsumeMessageBatchMaxSize subMsgs = msgs[count:next] count = next - 1 } if limiterOn { limiter(utils.WithoutNamespace(mq.Topic)) } pc.crCh[mq.Topic] <- struct{}{} go primitive.WithRecover(func() { defer func() { if err := recover(); err != nil { if primitive.DefaultPanicHandler != nil { primitive.DefaultPanicHandler(err) } rlog.Error("consumeMessageConcurrently panic", map[string]interface{}{ rlog.LogKeyUnderlayError: err, rlog.LogKeyStack: utils.GetStackAsString(false), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } <-pc.crCh[mq.Topic] }() RETRY: if pq.IsDroppd() { rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } beginTime := time.Now() pc.resetRetryAndNamespace(subMsgs) var result ConsumeResult var err error msgCtx := &primitive.ConsumeMessageContext{ Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, Msgs: subMsgs, } ctx := context.Background() ctx = primitive.WithConsumerCtx(ctx, msgCtx) ctx = primitive.WithMethod(ctx, primitive.ConsumerPush) concurrentCtx := primitive.NewConsumeConcurrentlyContext() concurrentCtx.MQ = *mq ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx) result, err = pc.consumeInner(ctx, subMsgs) consumeRT := time.Now().Sub(beginTime) if err != nil { rlog.Warning("consumeMessageCurrently error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq, rlog.LogKeyConsumerGroup: pc.consumerGroup, }) msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn) } else if consumeRT >= pc.option.ConsumeTimeout { rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{ rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq, rlog.LogKeyConsumerGroup: pc.consumerGroup, }) msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn) } else if result == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) } else if result == ConsumeRetryLater { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) } pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond)) if !pq.IsDroppd() { msgBackFailed := make([]*primitive.MessageExt, 0) msgBackSucceed := make([]*primitive.MessageExt, 0) if result == ConsumeSuccess { pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs)) msgBackSucceed = subMsgs } else { pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs)) if pc.model == BroadCasting { for i := 0; i < len(subMsgs); i++ { rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{ "message": subMsgs[i], }) } } else { for i := 0; i < len(subMsgs); i++ { msg := subMsgs[i] if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) { msgBackSucceed = append(msgBackSucceed, msg) } else { msg.ReconsumeTimes += 1 msgBackFailed = append(msgBackFailed, msg) } } } } offset := pq.removeMessage(msgBackSucceed...) if offset >= 0 && !pq.IsDroppd() { pc.storage.update(mq, int64(offset), true) } if len(msgBackFailed) > 0 { subMsgs = msgBackFailed time.Sleep(5 * time.Second) goto RETRY } } else { rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, "message": subMsgs, }) } }) } } func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) { if pq.IsDroppd() { rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), }) return } lock := pc.queueLock.fetchLock(*mq) lock.Lock() defer lock.Unlock() if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) { beginTime := time.Now() continueConsume := true for continueConsume { if pq.IsDroppd() { rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), }) break } if pc.model == Clustering { if !pq.IsLock() { rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), }) pc.tryLockLaterAndReconsume(mq, 10) return } if pq.isLockExpired() { rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), }) pc.tryLockLaterAndReconsume(mq, 10) return } } interval := time.Now().Sub(beginTime) if interval > pc.option.MaxTimeConsumeContinuously { time.Sleep(10 * time.Millisecond) return } batchSize := pc.option.ConsumeMessageBatchMaxSize msgs := pq.takeMessages(batchSize) pc.resetRetryAndNamespace(msgs) if len(msgs) == 0 { continueConsume = false break } // TODO: add message consumer hook beginTime = time.Now() ctx := context.Background() msgCtx := &primitive.ConsumeMessageContext{ Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, Msgs: msgs, } ctx = primitive.WithConsumerCtx(ctx, msgCtx) ctx = primitive.WithMethod(ctx, primitive.ConsumerPush) orderlyCtx := primitive.NewConsumeOrderlyContext() orderlyCtx.MQ = *mq ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx) pq.lockConsume.Lock() result, err := pc.consumeInner(ctx, msgs) if err != nil { rlog.Warning("consumeMessage orderly error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, rlog.LogKeyMessages: msgs, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } pq.lockConsume.Unlock() if result == Rollback || result == SuspendCurrentQueueAMoment { rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, "messages": msgs, rlog.LogKeyMessageQueue: mq, }) } // just put consumeResult in consumerMessageCtx //interval = time.Now().Sub(beginTime) //consumeReult := SuccessReturn //if interval > pc.option.ConsumeTimeout { // consumeReult = TimeoutReturn //} else if SuspendCurrentQueueAMoment == result { // consumeReult = FailedReturn //} else if ConsumeSuccess == result { // consumeReult = SuccessReturn //} // process result commitOffset := int64(-1) if pc.option.AutoCommit { switch result { case Commit, Rollback: rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, }) case ConsumeSuccess: commitOffset = pq.commit() case SuspendCurrentQueueAMoment: if pc.checkReconsumeTimes(msgs) { pq.makeMessageToCosumeAgain(msgs...) time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond) continueConsume = false } else { commitOffset = pq.commit() } default: } } else { switch result { case ConsumeSuccess: case Commit: commitOffset = pq.commit() case Rollback: // pq.rollback time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond) continueConsume = false case SuspendCurrentQueueAMoment: if pc.checkReconsumeTimes(msgs) { time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond) continueConsume = false } default: } } if commitOffset > 0 && !pq.IsDroppd() { _ = pc.updateOffset(mq, commitOffset) } } } else { if pq.IsDroppd() { rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), }) } pc.tryLockLaterAndReconsume(mq, 100) } } func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool { suspend := false if len(msgs) != 0 { maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes() for _, msg := range msgs { if msg.ReconsumeTimes > maxReconsumeTimes { rlog.Warning(fmt.Sprintf("msg will be send to retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil) msg.WithProperty("RECONSUME_TIME", strconv.Itoa(int(msg.ReconsumeTimes))) if !pc.sendMessageBack(msg.Queue.BrokerName, msg, -1) { suspend = true msg.ReconsumeTimes += 1 } } else { suspend = true msg.ReconsumeTimes += 1 } } } return suspend } func (pc *pushConsumer) getOrderlyMaxReconsumeTimes() int32 { if pc.option.MaxReconsumeTimes == -1 { return math.MaxInt32 } else { return pc.option.MaxReconsumeTimes } } func (pc *pushConsumer) getMaxReconsumeTimes() int32 { if pc.option.MaxReconsumeTimes == -1 { return 16 } else { return pc.option.MaxReconsumeTimes } } func (pc *pushConsumer) tryLockLaterAndReconsume(mq *primitive.MessageQueue, delay int64) { time.Sleep(time.Duration(delay) * time.Millisecond) if pc.lock(mq) == true { pc.submitConsumeRequestLater(10) } else { pc.submitConsumeRequestLater(3000) } } func (pc *pushConsumer) submitConsumeRequestLater(suspendTimeMillis int64) { if suspendTimeMillis == -1 { suspendTimeMillis = int64(pc.option.SuspendCurrentQueueTimeMillis / time.Millisecond) } if suspendTimeMillis < 10 { suspendTimeMillis = 10 } else if suspendTimeMillis > 30000 { suspendTimeMillis = 30000 } time.Sleep(time.Duration(suspendTimeMillis) * time.Millisecond) } func (pc *pushConsumer) cleanExpiredMsg() { pc.processQueueTable.Range(func(key, value interface{}) bool { pq := value.(*processQueue) pq.cleanExpiredMsg(pc) return true }) }