consumer/pull_consumer.go (857 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package consumer import ( "context" "fmt" "math" "runtime/pprof" "strconv" "strings" "sync" "sync/atomic" "time" errors2 "github.com/apache/rocketmq-client-go/v2/errors" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/pkg/errors" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" ) // ErrNoNewMsg returns a "no new message found". Occurs only when no new message found from broker var ErrNoNewMsg = errors.New("no new message found") func IsNoNewMsgError(err error) bool { return err == ErrNoNewMsg } type ConsumeRequest struct { messageQueue *primitive.MessageQueue processQueue *processQueue msgList []*primitive.MessageExt } func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt { return cr.msgList } func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue { return cr.messageQueue } func (cr *ConsumeRequest) GetPQ() *processQueue { return cr.processQueue } type SubscriptionType int const ( None SubscriptionType = iota Subscribe Assign ) type defaultPullConsumer struct { *defaultConsumer topic string selector MessageSelector GroupName string Model MessageModel SubType SubscriptionType UnitMode bool nextQueueSequence int64 allocateQueues []*primitive.MessageQueue mq2seekOffset sync.Map // key:primitive.MessageQueue,value:seekOffset done chan struct{} closeOnce sync.Once consumeRequestCache chan *ConsumeRequest submitToConsume func(*processQueue, *primitive.MessageQueue) interceptor primitive.Interceptor } func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { defaultOpts := defaultPullConsumerOptions() for _, apply := range options { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, defaultOpts.GroupName), cType: _PullConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, option: defaultOpts, allocate: defaultOpts.Strategy, } if dc.client == nil { return nil, fmt.Errorf("GetOrNewRocketMQClient faild") } defaultOpts.Namesrv = dc.client.GetNameSrv() c := &defaultPullConsumer{ defaultConsumer: dc, done: make(chan struct{}, 1), consumeRequestCache: make(chan *ConsumeRequest, 4), GroupName: dc.option.GroupName, } dc.mqChanged = c.messageQueueChanged c.submitToConsume = c.consumeMessageConcurrently c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...) return c, nil } func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error) { topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic) value, exist := pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs) if exist { return value.([]*primitive.MessageQueue), nil } pc.client.UpdateTopicRouteInfo() value, exist = pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs) if !exist { return nil, errors2.ErrRouteNotFound } return value.([]*primitive.MessageQueue), nil } func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error { if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) || atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) { return errors2.ErrStartTopic } if pc.SubType == Assign { return errors2.ErrSubscriptionType } if pc.SubType == None { pc.SubType = Subscribe } topic = utils.WrapNamespace(pc.option.Namespace, topic) data := buildSubscriptionData(topic, selector) pc.subscriptionDataTable.Store(topic, data) pc.topic = topic pc.selector = selector return nil } func (pc *defaultPullConsumer) Unsubscribe(topic string) error { if pc.SubType == Assign { return errors2.ErrSubscriptionType } topic = utils.WrapNamespace(pc.option.Namespace, topic) pc.subscriptionDataTable.Delete(topic) return nil } func (pc *defaultPullConsumer) Assign(topic string, mqs []*primitive.MessageQueue) error { if pc.SubType == Subscribe { return errors2.ErrSubscriptionType } if pc.SubType == None { pc.SubType = Assign } topic = utils.WrapNamespace(pc.option.Namespace, topic) data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll}) pc.topic = topic pc.subscriptionDataTable.Store(topic, data) oldQueues := pc.allocateQueues pc.allocateQueues = mqs rlog.Info("pull consumer assign new mqs", map[string]interface{}{ "topic": topic, "group": pc.GroupName, "oldMqs": oldQueues, "newMqs": mqs, }) if pc.isRunning() { pc.Rebalance() } return nil } func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, originOffset int64) int64 { if pc.SubType != Assign { return originOffset } value, exist := pc.mq2seekOffset.LoadAndDelete(*mq) if !exist { return originOffset } else { nextOffset := value.(int64) _ = pc.updateOffset(mq, nextOffset) rlog.Info("pull consumer assign new offset", map[string]interface{}{ "group": pc.GroupName, "mq": mq, "offset": nextOffset, }) return nextOffset } } func (pc *defaultPullConsumer) Start() error { var err error pc.once.Do(func() { err = pc.validate() if err != nil { rlog.Error("the consumer group option validate fail", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, rlog.LogKeyUnderlayError: err.Error(), }) err = errors.Wrap(err, "the consumer group option validate fail") return } err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) err = errors2.ErrCreated return } err = pc.start() if err != nil { return } atomic.StoreInt32(&pc.state, int32(internal.StateRunning)) go func() { for { select { case pr := <-pc.prCh: go func() { pc.pullMessage(&pr) }() case <-pc.done: rlog.Info("defaultPullConsumer close PullRequest listener.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }() }) if err != nil { return err } pc.client.UpdateTopicRouteInfo() _, exist := pc.topicSubscribeInfoTable.Load(pc.topic) if !exist { err = pc.Shutdown() if err != nil { rlog.Error("defaultPullConsumer.Shutdown . route info not found, it may not exist", map[string]interface{}{ rlog.LogKeyTopic: pc.topic, rlog.LogKeyUnderlayError: err, }) } return fmt.Errorf("the topic=%s route info not found, it may not exist", pc.topic) } pc.client.CheckClientInBroker() pc.client.SendHeartbeatToAllBrokerWithLock() go pc.client.RebalanceImmediately() return err } func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout time.Duration) (*ConsumeRequest, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() select { case <-ctx.Done(): return nil, ErrNoNewMsg case cr := <-pc.consumeRequestCache: if cr.processQueue.IsDroppd() { rlog.Info("defaultPullConsumer poll the message queue not be able to consume, because it was dropped", map[string]interface{}{ rlog.LogKeyMessageQueue: cr.messageQueue.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return nil, ErrNoNewMsg } if len(cr.GetMsgList()) == 0 { return nil, ErrNoNewMsg } return cr, nil } } func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult) { if cr == nil { return } pq := cr.processQueue mq := cr.messageQueue msgList := cr.msgList if len(msgList) == 0 || pq == nil || mq == nil { return } RETRY: if pq.IsDroppd() { rlog.Info("defaultPullConsumer the message queue not be able to consume, because it was dropped", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } pc.resetRetryAndNamespace(msgList) msgCtx := &primitive.ConsumeMessageContext{ Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, Msgs: msgList, } ctx = primitive.WithConsumerCtx(ctx, msgCtx) ctx = primitive.WithMethod(ctx, primitive.ConsumerPull) concurrentCtx := primitive.NewConsumeConcurrentlyContext() concurrentCtx.MQ = *mq ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx) if result == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) msgCtx.Success = true } else { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) msgCtx.Success = false } if pc.interceptor != nil { pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error { return nil }) } if !pq.IsDroppd() { msgBackFailed := make([]*primitive.MessageExt, 0) msgBackSucceed := make([]*primitive.MessageExt, 0) if result == ConsumeSuccess { pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(msgList)) msgBackSucceed = msgList } else { pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(msgList)) if pc.model == BroadCasting { for i := 0; i < len(msgList); i++ { rlog.Warning("defaultPullConsumer BROADCASTING, the message consume failed, drop it", map[string]interface{}{ "message": msgList[i], }) } } else { for i := 0; i < len(msgList); i++ { msg := msgList[i] if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) { msgBackSucceed = append(msgBackSucceed, msg) } else { msg.ReconsumeTimes += 1 msgBackFailed = append(msgBackFailed, msg) } } } } offset := pq.removeMessage(msgBackSucceed...) if offset >= 0 && !pq.IsDroppd() { pc.storage.update(mq, int64(offset), true) } if len(msgBackFailed) > 0 { msgList = msgBackFailed time.Sleep(5 * time.Second) goto RETRY } } else { rlog.Warning("defaultPullConsumer processQueue is dropped without process consume result.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, "message": msgList, }) } } // resetRetryAndNamespace modify retry message. func (pc *defaultPullConsumer) resetRetryAndNamespace(msgList []*primitive.MessageExt) { groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup beginTime := time.Now() for idx := range msgList { msg := msgList[idx] retryTopic := msg.GetProperty(primitive.PropertyRetryTopic) if retryTopic == "" && groupTopic == msg.Topic { msg.Topic = retryTopic } msgList[idx].WithProperty(primitive.PropertyConsumeStartTime, strconv.FormatInt( beginTime.UnixNano()/int64(time.Millisecond), 10)) } } func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitive.PullResult, error) { mq := pc.getNextQueueOf(pc.topic) if mq == nil { return nil, fmt.Errorf("prepare to pull topic: %s, but no queue is founded", pc.topic) } data := buildSubscriptionData(mq.Topic, pc.selector) nextOffset, err := pc.nextOffsetOf(mq) if err != nil { return nil, err } result, err := pc.pull(context.Background(), mq, data, nextOffset, numbers) if err != nil { return nil, err } pc.processPullResult(mq, result, data) if pc.interceptor != nil { msgCtx := &primitive.ConsumeMessageContext{ Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, Msgs: result.GetMessageExts(), Success: true, } ctx = primitive.WithConsumerCtx(ctx, msgCtx) ctx = primitive.WithMethod(ctx, primitive.ConsumerPull) pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx context.Context, req, reply interface{}) error { return nil }) } return result, nil } func (pc *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue { var queues []*primitive.MessageQueue var err error if len(pc.allocateQueues) == 0 { topic = utils.WrapNamespace(pc.option.Namespace, topic) queues, err = pc.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic) if err != nil { rlog.Error("get next mq error", map[string]interface{}{ rlog.LogKeyTopic: topic, rlog.LogKeyUnderlayError: err.Error(), }) return nil } if len(queues) == 0 { rlog.Warning("defaultPullConsumer.getNextQueueOf len is 0", map[string]interface{}{ rlog.LogKeyTopic: topic, }) return nil } } else { queues = pc.allocateQueues } index := int(atomic.LoadInt64(&pc.nextQueueSequence)) % len(queues) atomic.AddInt64(&pc.nextQueueSequence, 1) nextQueue := queues[index] rlog.Info("defaultPullConsumer.getNextQueueOf", map[string]interface{}{ rlog.LogKeyTopic: topic, rlog.LogKeyConsumerGroup: pc.consumerGroup, rlog.LogKeyMessageQueue: queues, rlog.LogKeyAllocateMessageQueue: nextQueue.String(), }) return nextQueue } func (pc *defaultPullConsumer) checkPull(mq *primitive.MessageQueue, offset int64, numbers int) error { err := pc.makeSureStateOK() if err != nil { return err } if mq == nil { return errors2.ErrMQEmpty } if offset < 0 { return errors2.ErrOffset } if numbers <= 0 { return errors2.ErrNumbers } return nil } // TODO: add timeout limit // TODO: add hook func (pc *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data *internal.SubscriptionData, offset int64, numbers int) (*primitive.PullResult, error) { mq.Topic = utils.WrapNamespace(pc.option.Namespace, mq.Topic) pc.consumerGroup = utils.WrapNamespace(pc.option.Namespace, pc.consumerGroup) if err := pc.checkPull(mq, offset, numbers); err != nil { return nil, err } pc.subscriptionAutomatically(mq.Topic) sysFlag := buildSysFlag(false, true, true, false) pullResp, err := pc.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0) if err != nil { return nil, err } pc.processPullResult(mq, pullResp, data) return pullResp, err } func (pc *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) (int64, error) { return pc.computePullFromWhereWithException(queue) } // PullFrom pull messages of queue from the offset to offset + numbers func (pc *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) { if err := pc.checkPull(queue, offset, numbers); err != nil { return nil, err } data := buildSubscriptionData(queue.Topic, pc.selector) return pc.pull(ctx, queue, data, offset, numbers) } // UpdateOffset updateOffset update offset of queue in mem func (pc *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error { return pc.updateOffset(queue, offset) } // PersistOffset persist all offset in mem. func (pc *defaultPullConsumer) PersistOffset(ctx context.Context, topic string) error { return pc.persistConsumerOffset() } // CurrentOffset return the current offset of queue in mem. func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) { v := pc.queryOffset(queue) return v, nil } // Shutdown close defaultConsumer, refuse new request. func (pc *defaultPullConsumer) Shutdown() error { var err error pc.closeOnce.Do(func() { if pc.option.TraceDispatcher != nil { pc.option.TraceDispatcher.Close() } close(pc.done) pc.client.UnregisterConsumer(pc.consumerGroup) err = pc.defaultConsumer.shutdown() }) return err } func (pc *defaultPullConsumer) PersistConsumerOffset() error { return pc.defaultConsumer.persistConsumerOffset() } func (pc *defaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) { pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs) } func (pc *defaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool { return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic) } func (pc *defaultPullConsumer) SubscriptionDataList() []*internal.SubscriptionData { return pc.defaultConsumer.SubscriptionDataList() } func (pc *defaultPullConsumer) IsUnitMode() bool { return pc.unitMode } func (pc *defaultPullConsumer) GetcType() string { return string(pc.cType) } func (pc *defaultPullConsumer) GetModel() string { return pc.model.String() } func (pc *defaultPullConsumer) GetWhere() string { switch pc.fromWhere { case ConsumeFromLastOffset: return "CONSUME_FROM_LAST_OFFSET" case ConsumeFromFirstOffset: return "CONSUME_FROM_FIRST_OFFSET" case ConsumeFromTimestamp: return "CONSUME_FROM_TIMESTAMP" default: return "UNKNOWN" } } func (pc *defaultPullConsumer) Rebalance() { switch pc.SubType { case Assign: pc.RebalanceViaTopic() break case Subscribe: pc.defaultConsumer.doBalance() break } } func (pc *defaultPullConsumer) RebalanceIfNotPaused() { switch pc.SubType { case Assign: pc.RebalanceViaTopic() break case Subscribe: pc.defaultConsumer.doBalanceIfNotPaused() break } } func (pc *defaultPullConsumer) RebalanceViaTopic() { changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, pc.allocateQueues) if changed { rlog.Info("PullConsumer rebalance result changed ", map[string]interface{}{ rlog.LogKeyAllocateMessageQueue: pc.allocateQueues, }) } } func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() pc.subscriptionDataTable.Range(func(key, value interface{}) bool { topic := key.(string) info.SubscriptionData[value.(*internal.SubscriptionData)] = true status := internal.ConsumeStatus{ PullRT: pc.stat.getPullRT(pc.consumerGroup, topic).avgpt, PullTPS: pc.stat.getPullTPS(pc.consumerGroup, topic).tps, ConsumeRT: pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt, ConsumeOKTPS: pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps, ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps, ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum, } info.StatusTable[topic] = status return true }) pc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) pInfo := pq.currentInfo() pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore) info.MQTable[mq] = pInfo return true }) if stack { var buffer strings.Builder err := pprof.Lookup("goroutine").WriteTo(&buffer, 2) if err != nil { rlog.Error("error when get stack ", map[string]interface{}{ "error": err, }) } else { info.JStack = buffer.String() } } nsAddr := "" for _, value := range pc.client.GetNameSrv().AddrList() { nsAddr += fmt.Sprintf("%s;", value) } info.Properties[internal.PropNameServerAddr] = nsAddr info.Properties[internal.PropConsumeType] = string(pc.cType) info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly) info.Properties[internal.PropThreadPoolCoreSize] = "-1" info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10) return info } func (pc *defaultPullConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult { return nil } func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) { } func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) { pc.mq2seekOffset.Store(*mq, offset) rlog.Info("pull consumer seek offset", map[string]interface{}{ "mq": mq, "offset": offset, }) } func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) { return pc.searchOffsetByTimestamp(mq, timestamp) } func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) { if pc.SubType == Assign { return } var allocateQueues []*primitive.MessageQueue pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) allocateQueues = append(allocateQueues, &mq) return true }) pc.allocateQueues = allocateQueues pc.defaultConsumer.client.SendHeartbeatToAllBrokerWithLock() } func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool { var brokerAddr string if len(brokerName) != 0 { brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName) } else { brokerAddr = msg.StoreHost } resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) if err != nil || resp.Code != internal.ResSuccess { // send back as a normal message return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes()) } return true } func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand { req := &internal.ConsumerSendMsgBackRequestHeader{ Group: pc.consumerGroup, OriginTopic: msg.Topic, Offset: msg.CommitLogOffset, DelayLevel: delayLevel, OriginMsgId: msg.MsgId, MaxReconsumeTimes: pc.getMaxReconsumeTimes(), BrokerName: msg.Queue.BrokerName, } return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil) } func (pc *defaultPullConsumer) getMaxReconsumeTimes() int32 { if pc.option.MaxReconsumeTimes == -1 { return 16 } else { return pc.option.MaxReconsumeTimes } } func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { rlog.Debug("defaultPullConsumer start a new Pull Message task for PullRequest", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) var sleepTime time.Duration pq := request.pq go primitive.WithRecover(func() { for { select { case <-pc.done: rlog.Info("defaultPullConsumer close pullMessage.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: pc.submitToConsume(request.pq, request.mq) if request.pq.IsDroppd() { rlog.Info("defaultPullConsumer quit pullMessage for dropped queue.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } } }) for { NEXT: select { case <-pc.done: rlog.Info("defaultPullConsumer close message handle.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: } if pq.IsDroppd() { rlog.Debug("defaultPullConsumer the request was dropped, so stop task", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) return } if sleepTime > 0 { rlog.Debug(fmt.Sprintf("defaultPullConsumer pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil) time.Sleep(sleepTime) } // reset time sleepTime = pc.option.PullInterval.Load() pq.lastPullTime.Store(time.Now()) err := pc.makeSureStateOK() if err != nil { rlog.Warning("defaultPullConsumer state error", map[string]interface{}{ rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if pc.pause { rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of [%s] was paused, execute pull request [%s] later", pc.option.InstanceName, pc.consumerGroup, request.String()), nil) sleepTime = _PullDelayTimeWhenSuspend goto NEXT } v, exist := pc.subscriptionDataTable.Load(request.mq.Topic) if !exist { rlog.Info("defaultPullConsumer find the consumer's subscription failed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } nextOffset := pc.nextPullOffset(request.mq, request.nextOffset) beginTime := time.Now() sd := v.(*internal.SubscriptionData) sysFlag := buildSysFlag(false, true, true, false) pullRequest := &internal.PullMessageRequestHeader{ ConsumerGroup: pc.consumerGroup, Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), QueueOffset: nextOffset, MaxMsgNums: pc.option.PullBatchSize.Load(), SysFlag: sysFlag, CommitOffset: 0, SubExpression: sd.SubString, ExpressionType: string(TAG), SuspendTimeoutMillis: 20 * time.Second, BrokerName: request.mq.BrokerName, } brokerResult := pc.defaultConsumer.tryFindBroker(request.mq) if brokerResult == nil { rlog.Warning("defaultPullConsumer no broker found for mq", map[string]interface{}{ rlog.LogKeyPullRequest: request.mq.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if brokerResult.Slave { pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag) } rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil) result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) if err != nil { rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if result.Status == primitive.PullBrokerTimeout { rlog.Warning("defaultPullConsumer pull broker timeout", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, }) sleepTime = _PullDelayTimeWhenError goto NEXT } pc.processPullResult(request.mq, result, sd) switch result.Status { case primitive.PullFound: rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil) prevRequestOffset := request.nextOffset request.nextOffset = result.NextBeginOffset rt := time.Now().Sub(beginTime) / time.Millisecond pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt)) msgFounded := result.GetMessageExts() firstMsgOffset := int64(math.MaxInt64) if len(msgFounded) != 0 { firstMsgOffset = msgFounded[0].QueueOffset pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded)) pq.putMessage(msgFounded...) } if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset { rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{ "nextBeginOffset": result.NextBeginOffset, "firstMsgOffset": firstMsgOffset, "prevRequestOffset": prevRequestOffset, }) } case primitive.PullNoNewMsg, primitive.PullNoMsgMatched: request.nextOffset = result.NextBeginOffset pc.correctTagsOffset(request) case primitive.PullOffsetIllegal: rlog.Warning("defaultPullConsumer the pull request offset illegal", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), "result": result.String(), }) request.nextOffset = result.NextBeginOffset pq.WithDropped(true) time.Sleep(10 * time.Second) pc.storage.update(request.mq, request.nextOffset, false) pc.storage.persist([]*primitive.MessageQueue{request.mq}) pc.processQueueTable.Delete(*request.mq) rlog.Warning(fmt.Sprintf("defaultPullConsumer fix the pull request offset: %s", request.String()), nil) default: rlog.Warning(fmt.Sprintf("defaultPullConsumer unknown pull status: %v", result.Status), nil) sleepTime = _PullDelayTimeWhenError } } } func (pc *defaultPullConsumer) correctTagsOffset(pr *PullRequest) { if pr.pq.cachedMsgCount.Load() <= 0 { pc.storage.update(pr.mq, pr.nextOffset, true) } } func (pc *defaultPullConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) { msgList := pq.getMessages() if msgList == nil { return } if pq.IsDroppd() { rlog.Info("defaultPullConsumer consumeMessageConcurrently the message queue not be able to consume, because it was dropped", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } cr := &ConsumeRequest{ messageQueue: mq, processQueue: pq, msgList: msgList, } select { case <-pq.closeChan: return case pc.consumeRequestCache <- cr: } } func (pc *defaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus { consumerStatus := internal.NewConsumerStatus() mqOffsetMap := pc.storage.getMQOffsetMap(topic) if mqOffsetMap != nil { consumerStatus.MQOffsetMap = mqOffsetMap } return consumerStatus } func (pc *defaultPullConsumer) validate() error { if err := internal.ValidateGroup(pc.consumerGroup); err != nil { return err } if pc.consumerGroup == internal.DefaultConsumerGroup { return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup) } if pc.SubType == None { return errors2.ErrBlankSubType } return nil }