inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go (796 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package client import ( "context" "encoding/binary" "fmt" "hash/crc32" "os" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/golang/protobuf/proto" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/errs" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/sub" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" ) const ( consumeStatusNormal = 0 consumeStatusFromMax = 1 consumeStatusFromMaxAlways = 2 msgFlagIncProperties = 0x01 ) type consumer struct { clientID string config *config.Config subInfo *sub.SubInfo rmtDataCache *remote.RmtDataCache visitToken int64 authorizedInfo string nextAuth2Master int32 nextAuth2Broker int32 master *selector.Node client rpc.RPCClient selector selector.Selector lastMasterHb int64 masterHBRetry int heartbeatManager *heartbeatManager unreportedTimes int cancel context.CancelFunc routineClosed chan struct{} closeOnce sync.Once } // NewConsumer returns a consumer which is constructed by a given config. func NewConsumer(config *config.Config) (Consumer, error) { if err := config.ValidateConsumer(); err != nil { return nil, err } log.Infof("The config of the consumer is %s", config) selector, err := selector.Get("ip") if err != nil { return nil, err } clientID := newClient(config.Consumer.Group) pool := multiplexing.NewPool() opts := &transport.Options{} if config.Net.TLS.Enable { opts.TLSEnable = true opts.CACertFile = config.Net.TLS.CACertFile opts.TLSCertFile = config.Net.TLS.TLSCertFile opts.TLSKeyFile = config.Net.TLS.TLSKeyFile opts.TLSServerName = config.Net.TLS.TLSServerName } client := rpc.New(pool, opts, config) r := remote.NewRmtDataCache() r.SetConsumerInfo(clientID, config.Consumer.Group) c := &consumer{ config: config, clientID: clientID, subInfo: sub.NewSubInfo(config), rmtDataCache: r, selector: selector, client: client, visitToken: util.InvalidValue, unreportedTimes: 0, routineClosed: make(chan struct{}), } ctx := context.Background() ctx, c.cancel = context.WithCancel(ctx) c.subInfo.SetClientID(clientID) hbm := newHBManager(c) c.heartbeatManager = hbm go c.routine(ctx) go c.processRebalanceEvent(ctx) log.Infof("[CONSUMER] start consumer success, client=%s", clientID) return c, nil } func (c *consumer) routine(ctx context.Context) { defer close(c.routineClosed) for { select { case <-ctx.Done(): return default: } // select master node, err := c.selector.Select(c.config.Consumer.Masters) if err != nil { log.Errorf("[CONSUMER] select error %v", err) time.Sleep(time.Second) continue } c.master = node log.Infof("[CONSUMER] master %+v", c.master) // register to master if err := c.register2Master(ctx); err != nil { log.Errorf("[CONSUMER] register2Master error %v", err) time.Sleep(time.Second) continue } c.lastMasterHb = time.Now().UnixMilli() // heartbeat to master time.Sleep(c.config.Heartbeat.Interval / 2) if err := c.heartbeat2Master(ctx); err != nil { log.Errorf("[CONSUMER] heartbeat2Master error %v", err) } else { c.lastMasterHb = time.Now().UnixMilli() } heartbeatRetry := 0 for { time.Sleep(c.config.Heartbeat.Interval) select { case <-ctx.Done(): return default: } if heartbeatRetry >= c.config.Heartbeat.MaxRetryTimes { break } if err := c.heartbeat2Master(ctx); err != nil { log.Errorf("[CONSUMER] heartbeat2Master error %v", err) heartbeatRetry++ continue } else { heartbeatRetry = 0 c.lastMasterHb = time.Now().UnixMilli() } } } } func (c *consumer) register2Master(ctx context.Context) error { rsp, err := c.sendRegRequest2Master(ctx) if err != nil { return err } if !rsp.GetSuccess() { return errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } c.processRegisterResponseM2C(rsp) return nil } func (c *consumer) sendRegRequest2Master(ctx context.Context) (*protocol.RegisterResponseM2C, error) { ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout) defer cancel() m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(c.master.Address) auth := &protocol.AuthenticateInfo{} if c.needGenMasterCertificateInfo(true) { util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password) } m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) m.SetSubscribeInfo(sub) rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache) return rsp, err } func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) { if !rsp.GetNotAllocated() { c.subInfo.CASIsNotAllocated(1, 0) } if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 { if rsp.GetDefFlowCheckId() != 0 { c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo()) } qryPriorityID := c.rmtDataCache.GetQryPriorityID() if rsp.GetQryPriorityId() != 0 { qryPriorityID = rsp.GetQryPriorityId() } c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo()) } if rsp.GetAuthorizedInfo() != nil { c.processAuthorizedToken(rsp.GetAuthorizedInfo()) } } func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) { atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken()) c.authorizedInfo = info.GetAuthAuthorizedToken() } func (c *consumer) heartbeat2Master(ctx context.Context) error { rsp, err := c.sendHeartbeat2Master(ctx) if err != nil { return err } if !rsp.GetSuccess() { return errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } c.processHBResponseM2C(rsp) return nil } func (c *consumer) sendHeartbeat2Master(ctx context.Context) (*protocol.HeartResponseM2C, error) { if time.Now().UnixNano()/int64(time.Millisecond)-c.lastMasterHb > 30000 { c.rmtDataCache.HandleExpiredPartitions(c.config.Consumer.MaxConfirmWait) } m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(c.master.Address) m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) m.SetSubscribeInfo(sub) auth := &protocol.AuthenticateInfo{} if c.needGenMasterCertificateInfo(true) { util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password) } c.unreportedTimes++ if c.unreportedTimes > c.config.Consumer.MaxSubInfoReportInterval { m.SetReportTimes(true) c.unreportedTimes = 0 } ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout) defer cancel() rsp, err := c.client.HeartRequestC2M(ctx, m, c.subInfo, c.rmtDataCache) return rsp, err } func (c *consumer) processHBResponseM2C(rsp *protocol.HeartResponseM2C) { c.masterHBRetry = 0 if !rsp.GetNotAllocated() { c.subInfo.CASIsNotAllocated(1, 0) } if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 { if rsp.GetDefFlowCheckId() != 0 { c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo()) } qryPriorityID := c.rmtDataCache.GetQryPriorityID() if rsp.GetQryPriorityId() != 0 { qryPriorityID = rsp.GetQryPriorityId() } c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo()) } if rsp.GetAuthorizedInfo() != nil { c.processAuthorizedToken(rsp.GetAuthorizedInfo()) } if rsp.GetRequireAuth() { atomic.StoreInt32(&c.nextAuth2Master, 1) } if rsp.GetEvent() != nil { event := rsp.GetEvent() subscribeInfo := make([]*metadata.SubscribeInfo, 0, len(event.GetSubscribeInfo())) for _, sub := range event.GetSubscribeInfo() { s, err := metadata.NewSubscribeInfo(sub) if err != nil { continue } subscribeInfo = append(subscribeInfo, s) } e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo) c.rmtDataCache.OfferEvent(e) } } // GetMessage implementation of TubeMQ consumer. func (c *consumer) GetMessage() (*ConsumerResult, error) { partition, bookedTime, err := c.rmtDataCache.SelectPartition() if err != nil { return nil, err } confirmContext := partition.GetPartitionKey() + "@" + strconv.FormatInt(bookedTime, 10) isFiltered := c.subInfo.IsFiltered(partition.GetTopic()) pi := &PeerInfo{ BrokerHost: partition.GetBroker().GetHost(), PartitionID: uint32(partition.GetPartitionID()), PartitionKey: partition.GetPartitionKey(), CurrOffset: util.InvalidValue, } m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) sub.SetPartition(partition) m.SetSubscribeInfo(sub) ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) defer cancel() rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, c.rmtDataCache) if err != nil { log.Infof("[CONSUMER]GetMessage error %s", err.Error()) if err := c.rmtDataCache.ReleasePartition(true, isFiltered, confirmContext, false); err != nil { log.Errorf("[CONSUMER]GetMessage release partition error %s", err.Error()) return nil, err } return nil, err } cr := &ConsumerResult{ TopicName: partition.GetTopic(), ConfirmContext: confirmContext, PeerInfo: pi, } msgs, err := c.processGetMessageRspB2C(pi, isFiltered, partition, confirmContext, rsp) if err != nil { return cr, err } cr.Messages = msgs return cr, err } // Confirm implementation of TubeMQ consumer. func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConfirmResult, error) { partitionKey, bookedTime, err := util.ParseConfirmContext(confirmContext) if err != nil { return nil, errs.New(errs.RetBadRequest, "illegel confirm_context content: unregular confirm_context value format") } topic, err := parsePartitionKeyToTopic(partitionKey) if err != nil { return nil, errs.New(errs.RetBadRequest, err.Error()) } if !c.rmtDataCache.IsPartitionInUse(partitionKey, bookedTime) { return nil, errs.New(errs.RetErrConfirmTimeout, "The confirm_context's value invalid!") } partition := c.rmtDataCache.GetPartition(partitionKey) if partition == nil { return nil, errs.New(errs.RetErrConfirmTimeout, "Not found the partition by confirm_context!") } defer c.rmtDataCache.ReleasePartition(true, c.subInfo.IsFiltered(topic), confirmContext, consumed) rsp, err := c.sendConfirmReq2Broker(partition, consumed) if err != nil { log.Infof("[CONSUMER]Confirm error %s", err.Error()) return nil, err } pi := &PeerInfo{ BrokerHost: partition.GetBroker().GetHost(), PartitionID: uint32(partition.GetPartitionID()), PartitionKey: partition.GetPartitionKey(), CurrOffset: rsp.GetCurrOffset(), MaxOffset: rsp.GetMaxOffset(), } cr := &ConfirmResult{ ConfirmContext: confirmContext, TopicName: partition.GetTopic(), PeerInfo: pi, } if !rsp.GetSuccess() { return cr, errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } c.rmtDataCache.BookPartitionInfo(partitionKey, rsp.GetCurrOffset(), rsp.GetMaxOffset()) return cr, err } func (c *consumer) sendConfirmReq2Broker(partition *metadata.Partition, consumed bool) (*protocol.CommitOffsetResponseB2C, error) { m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) partition.SetLastConsumed(consumed) sub.SetPartition(partition) m.SetSubscribeInfo(sub) ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout-500) defer cancel() rsp, err := c.client.CommitOffsetRequestC2B(ctx, m, c.subInfo) return rsp, err } func parsePartitionKeyToTopic(partitionKey string) (string, error) { pos1 := strings.Index(partitionKey, ":") if pos1 == -1 { return "", fmt.Errorf("illegel confirm_context content: unregular index key value format") } topic := partitionKey[pos1+1:] pos2 := strings.LastIndex(topic, ":") if pos2 == -1 { return "", fmt.Errorf("illegel confirm_context content: unregular index's topic key value format") } topic = topic[:pos2] return topic, nil } // GetCurrConsumedInfo implementation of TubeMQ consumer. func (c *consumer) GetCurrConsumedInfo() map[string]*remote.ConsumerOffset { partitionOffset := c.rmtDataCache.GetCurPartitionOffset() consumedInfo := make(map[string]*remote.ConsumerOffset, len(partitionOffset)) for partitionKey, offset := range partitionOffset { co := &remote.ConsumerOffset{ PartitionKey: partitionKey, CurrOffset: offset.CurrOffset, MaxOffset: offset.MaxOffset, UpdateTime: offset.UpdateTime, } consumedInfo[partitionKey] = co } return consumedInfo } // GetClientID implementation of TubeMQ consumer. func (c *consumer) GetClientID() string { return c.clientID } // Close implementation of TubeMQ consumer. func (c *consumer) Close() { c.closeOnce.Do(func() { log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID) c.cancel() c.heartbeatManager.close() c.close2Master() c.closeAllBrokers() c.client.Close() <-c.routineClosed log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID) }) } func (c *consumer) processRebalanceEvent(ctx context.Context) { log.Info("[CONSUMER]Rebalance event Handler starts!") for { select { case <-ctx.Done(): log.Info("[CONSUMER] Rebalance event Handler stopped!") return case event, ok := <-c.rmtDataCache.EventCh: if ok { log.Infof("%+v", event) c.rmtDataCache.ClearEvent() switch event.GetEventType() { case metadata.Disconnect, metadata.OnlyDisconnect: c.disconnect2Broker(event) c.rmtDataCache.OfferEventResult(event) case metadata.Connect, metadata.OnlyConnect: c.connect2Broker(ctx, event) c.rmtDataCache.OfferEventResult(event) } } } } } func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) { log.Tracef("[disconnect2Broker] begin to process disconnect event, client=%s", c.clientID) subscribeInfo := event.GetSubscribeInfo() if len(subscribeInfo) > 0 { removedPartitions := make(map[*metadata.Node][]*metadata.Partition) c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions) if len(removedPartitions) > 0 { c.unregister2Broker(removedPartitions) } } event.SetEventStatus(metadata.Done) log.Tracef("[disconnect2Broker] disconnect event finished, client=%s", c.clientID) } func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) { if len(unRegPartitions) == 0 { return } var wg sync.WaitGroup for _, partitions := range unRegPartitions { for _, partition := range partitions { log.Tracef("unregister2Brokers, partition key=%s", partition.GetPartitionKey()) partition := partition wg.Add(1) go func() { defer wg.Done() if err := c.sendUnregisterReq2Broker(partition); err != nil { log.Errorf("[CONSUMER] unregister partition %+v failed, error %v", partition, err) } else { log.Infof("[connect2Broker] unregister partition %+v success", partition) } }() } } wg.Wait() } func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) error { ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) defer cancel() m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) m.SetNode(node) m.SetReadStatus(1) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) sub.SetConsumerID(c.clientID) sub.SetPartition(partition) m.SetSubscribeInfo(sub) auth := c.genBrokerAuthenticInfo(true) c.subInfo.SetAuthorizedInfo(auth) rsp, err := c.client.UnregisterRequestC2B(ctx, m, c.subInfo) if err != nil { log.Errorf("[CONSUMER] fail to unregister partition %s, error %s", partition, err.Error()) return err } if !rsp.GetSuccess() { log.Errorf("[CONSUMER] fail to unregister partition %s, err code: %d, error msg %s", partition, rsp.GetErrCode(), rsp.GetErrMsg()) return errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } return nil } func (c *consumer) connect2Broker(ctx context.Context, event *metadata.ConsumerEvent) { log.Tracef("[connect2Broker] connect event begin, client=%s", c.clientID) if len(event.GetSubscribeInfo()) > 0 { unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo()) n := len(unsubPartitions) if len(unsubPartitions) > 0 { for i, partition := range unsubPartitions { select { case <-ctx.Done(): return default: } node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) rsp, err := c.sendRegisterReq2Broker(partition, node) if err != nil { log.Warnf("[connect2Broker] error %s", err.Error()) continue } if !rsp.GetSuccess() { log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.GetErrCode(), rsp.GetErrMsg()) return } log.Infof("[connect2Broker] %v/%v register partition %+v success", i, n, partition) c.rmtDataCache.AddNewPartition(partition) c.heartbeatManager.registerBroker(partition.GetBroker()) } } } c.subInfo.SetNotFirstRegistered() event.SetEventStatus(metadata.Done) log.Tracef("[connect2Broker] connect event finished, client ID=%s", c.clientID) } func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) { ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) defer cancel() m := &metadata.Metadata{} m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) sub.SetConsumerID(c.clientID) sub.SetPartition(partition) m.SetSubscribeInfo(sub) isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey()) m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister)) auth := c.genBrokerAuthenticInfo(true) c.subInfo.SetAuthorizedInfo(auth) rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache) return rsp, err } func newClient(group string) string { return group + "_" + util.GetLocalHost() + "-" + strconv.Itoa(os.Getpid()) + "-" + strconv.Itoa(int(time.Now().Unix()*1000)) + "-" + strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" + "go-" + tubeMQClientVersion } func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo { needAdd := false auth := &protocol.AuthorizedInfo{ VisitAuthorizedToken: proto.Int64(atomic.LoadInt64(&c.visitToken)), } if c.config.Net.Auth.Enable { if force { needAdd = true atomic.StoreInt32(&c.nextAuth2Broker, 0) } else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 { if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) { needAdd = true } } if needAdd { authToken := util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, c.config.Net.Auth.Password) auth.AuthAuthorizedToken = proto.String(authToken) } } return auth } func (c *consumer) needGenMasterCertificateInfo(force bool) bool { needAdd := false if c.config.Net.Auth.Enable { if force { needAdd = true atomic.StoreInt32(&c.nextAuth2Master, 0) } else if atomic.LoadInt32(&c.nextAuth2Master) == 1 { if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) { needAdd = true } } if needAdd { } } return needAdd } func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 { readStatus := consumeStatusNormal if isFirstReg { if c.config.Consumer.ConsumePosition == 0 { readStatus = consumeStatusFromMax log.Infof("[Consumer From Max Offset], client=%s", c.clientID) } else if c.config.Consumer.ConsumePosition > 0 { readStatus = consumeStatusFromMaxAlways log.Infof("[Consumer From Max Offset Always], client=%s", c.clientID) } } return int32(readStatus) } func (c *consumer) checkPartitionErr() error { startTime := time.Now().UnixNano() / int64(time.Millisecond) for { ret := c.rmtDataCache.GetCurConsumeStatus() if ret == 0 { return nil } if c.config.Consumer.MaxPartCheckPeriod >= 0 && time.Now().UnixNano()/int64(time.Millisecond)-startTime >= c.config.Consumer.MaxPartCheckPeriod.Milliseconds() { switch ret { case errs.RetErrNoPartAssigned: return errs.ErrNoPartAssigned case errs.RetErrAllPartInUse: return errs.ErrAllPartInUse case errs.RetErrAllPartWaiting: return errs.ErrAllPartWaiting } } time.Sleep(c.config.Consumer.PartCheckSlice) } } func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partition *metadata.Partition, confirmContext string, rsp *protocol.GetMessageResponseB2C) ([]*Message, error) { limitDlt := int64(300) escLimit := rsp.GetEscFlowCtrl() now := time.Now().UnixNano() / int64(time.Millisecond) switch rsp.GetErrCode() { case errs.RetSuccess: dataDleVal := util.InvalidValue if rsp.GetCurrDataDlt() >= 0 { dataDleVal = rsp.GetCurrDataDlt() } currOffset := util.InvalidValue if rsp.GetCurrOffset() >= 0 { currOffset = rsp.GetCurrOffset() } maxOffset := util.InvalidValue if rsp.GetMaxOffset() >= 0 { maxOffset = rsp.GetMaxOffset() } msgSize, msgs := c.convertMessages(filtered, partition.GetTopic(), rsp) c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), currOffset, maxOffset) cd := metadata.NewConsumeData(now, 200, escLimit, int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow()) c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd) pi.CurrOffset = currOffset pi.MaxOffset = maxOffset log.Tracef("[CONSUMER] getMessage count=%ld, from %s, client=%s", len(msgs), partition.GetPartitionKey(), c.clientID) return msgs, nil case errs.RetErrHBNoNode, errs.RetCertificateFailure, errs.RetErrDuplicatePartition: partitionKey, _, err := util.ParseConfirmContext(confirmContext) if err != nil { return nil, err } c.rmtDataCache.RemovePartition([]string{partitionKey}) return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) case errs.RetErrConsumeSpeedLimit: defDltTime := int64(rsp.GetMinLimitTime()) if defDltTime == 0 { defDltTime = c.config.Consumer.MsgNotFoundWait.Milliseconds() } cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, defDltTime, rsp.GetRequireSlow()) c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue) c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd) c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false) return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) case errs.RetErrNotFound: limitDlt = c.config.Consumer.MsgNotFoundWait.Milliseconds() case errs.RetErrForbidden: limitDlt = 2000 case errs.RetErrMoved: limitDlt = 200 case errs.RetErrServiceUnavailable: } if rsp.GetErrCode() != errs.RetSuccess { cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, util.InvalidValue, rsp.GetRequireSlow()) c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue) c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd) c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false) return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) } func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.GetMessageResponseB2C) (int, []*Message) { msgSize := 0 if len(rsp.GetMessages()) == 0 { return msgSize, nil } msgs := make([]*Message, 0, len(rsp.GetMessages())) for _, m := range rsp.GetMessages() { checkSum := uint64(crc32.Update(0, crc32.IEEETable, m.GetPayLoadData())) & 0x7FFFFFFF if int32(checkSum) != m.GetCheckSum() { continue } readPos := 0 payLoadData := m.GetPayLoadData() dataLen := len(payLoadData) var properties map[string]string if m.GetFlag()&msgFlagIncProperties == 1 { if len(payLoadData) < 4 { continue } attrLen := int(binary.BigEndian.Uint32(payLoadData[:4])) readPos += 4 dataLen -= 4 if attrLen > dataLen { continue } attribute := payLoadData[readPos : readPos+attrLen] readPos += attrLen dataLen -= attrLen properties = util.SplitToMap(string(attribute), ",", "=") if filtered { topicFilters := c.subInfo.GetTopicFilters() if msgKey, ok := properties["$msgType$"]; ok { if filters, ok := topicFilters[topic]; ok { found := false for _, filter := range filters { if filter == msgKey { found = true break } } if !found { continue } } } } } msg := &Message{ Topic: topic, Flag: m.GetFlag(), ID: m.GetMessageId(), Properties: properties, DataLen: int32(dataLen), Data: payLoadData[readPos:], } msgs = append(msgs, msg) msgSize += dataLen } return msgSize, msgs } func (c *consumer) close2Master() { log.Infof("[CONSUMER] close2Master begin, client=%s", c.clientID) ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) defer cancel() m := &metadata.Metadata{} node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(c.master.Address) m.SetNode(node) sub := &metadata.SubscribeInfo{} sub.SetGroup(c.config.Consumer.Group) m.SetSubscribeInfo(sub) mci := &protocol.MasterCertificateInfo{} auth := &protocol.AuthenticateInfo{} if c.needGenMasterCertificateInfo(true) { util.GenMasterAuthenticateToken(auth, c.config.Net.Auth.UserName, c.config.Net.Auth.Password) } c.subInfo.SetMasterCertificateInfo(mci) rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo) if err != nil { log.Errorf("[CONSUMER] fail to close master, error: %s", err.Error()) return } if !rsp.GetSuccess() { log.Errorf("[CONSUMER] fail to close master, error code: %d, error msg: %s", rsp.GetErrCode(), rsp.GetErrMsg()) return } log.Infof("[CONSUMER] close2Master finished, client=%s", c.clientID) } func (c *consumer) closeAllBrokers() { log.Infof("[CONSUMER] closeAllBrokers begin, client=%s", c.clientID) partitions := c.rmtDataCache.GetAllClosedBrokerParts() if len(partitions) > 0 { c.unregister2Broker(partitions) } log.Infof("[CONSUMER] closeAllBrokers end, client=%s", c.clientID) }