func GetOrNewRocketMQClient()

in internal/client.go [195:411]


func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
	client := &rmqClient{
		option:       option,
		remoteClient: remote.NewRemotingClient(option.RemotingClientConfig),
		done:         make(chan struct{}),
	}
	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)

	if loaded {
		// compare namesrv address
		client = actual.(*rmqClient)
		now := option.Namesrv.(*namesrvs).resolver.Resolve()
		old := client.GetNameSrv().(*namesrvs).resolver.Resolve()
		if len(now) != len(old) {
			rlog.Error("different namesrv option in the same instance", map[string]interface{}{
				"NewNameSrv":    now,
				"BeforeNameSrv": old,
			})
			return nil
		}
		sort.Strings(now)
		sort.Strings(old)
		for i := 0; i < len(now); i++ {
			if now[i] != old[i] {
				rlog.Error("different namesrv option in the same instance", map[string]interface{}{
					"NewNameSrv":    now,
					"BeforeNameSrv": old,
				})
				return nil
			}
		}
	} else {
		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
				rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
			})
			client.RebalanceIfNotPaused()
			return nil
		})
		client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			header := new(CheckTransactionStateRequestHeader)
			header.Decode(req.ExtFields)
			msgExts := primitive.DecodeMessage(req.Body)
			if len(msgExts) == 0 {
				rlog.Warning("checkTransactionState, decode message failed", nil)
				return nil
			}
			msgExt := msgExts[0]
			// TODO: add namespace support
			transactionID := msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			if len(transactionID) > 0 {
				msgExt.TransactionId = transactionID
			}
			group := msgExt.GetProperty(primitive.PropertyProducerGroup)
			if group == "" {
				rlog.Warning("checkTransactionState, pick producer group failed", nil)
				return nil
			}
			if option.GroupName != group {
				rlog.Warning("producer group is not equal", nil)
				return nil
			}
			callback := &CheckTransactionStateCallback{
				Addr:   addr,
				Msg:    msgExt,
				Header: *header,
			}
			callbackCh <- callback
			return nil
		})

		client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive get consumer running info request...", nil)
			header := new(GetConsumerRunningInfoHeader)
			header.Decode(req.ExtFields)
			val, exist := clientMap.Load(header.clientID)
			res := remote.NewRemotingCommand(ResError, nil, nil)
			if !exist {
				res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
			} else {
				cli, ok := val.(*rmqClient)
				var runningInfo *ConsumerRunningInfo
				if ok {
					runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable)
				}
				if runningInfo != nil {
					res.Code = ResSuccess
					data, err := runningInfo.Encode()
					if err != nil {
						res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
					} else {
						res.Body = data
					}
				} else {
					res.Remark = "there is unexpected error when get running info, please check log"
				}
			}
			return res
		})

		client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive consume message directly request...", nil)
			header := new(ConsumeMessageDirectlyHeader)
			header.Decode(req.ExtFields)
			val, exist := clientMap.Load(header.clientID)
			res := remote.NewRemotingCommand(ResError, nil, nil)
			if !exist {
				res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
			} else {
				cli, ok := val.(*rmqClient)
				msg := primitive.DecodeMessage(req.Body)[0]
				var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult
				if ok {
					consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
				}
				if consumeMessageDirectlyResult != nil {
					res.Code = ResSuccess
					data, err := consumeMessageDirectlyResult.Encode()
					if err != nil {
						res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
					} else {
						res.Body = data
					}
				} else {
					res.Remark = "there is unexpected error when consume message directly, please check log"
				}
			}
			return res
		})

		client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive reset consumer offset request...", map[string]interface{}{
				rlog.LogKeyBroker:        addr.String(),
				rlog.LogKeyTopic:         req.ExtFields["topic"],
				rlog.LogKeyConsumerGroup: req.ExtFields["group"],
				rlog.LogKeyTimeStamp:     req.ExtFields["timestamp"],
			})
			header := new(ResetOffsetHeader)
			header.Decode(req.ExtFields)

			body := new(ResetOffsetBody)
			body.Decode(req.Body)

			client.resetOffset(header.topic, header.group, body.OffsetTable)
			return nil
		})

		client.remoteClient.RegisterRequestFunc(ReqPushReplyMessageToClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			receiveTime := time.Now().UnixNano() / int64(time.Millisecond)
			rlog.Info("receive push reply to client request...", map[string]interface{}{
				rlog.LogKeyBroker:        addr.String(),
				rlog.LogKeyTopic:         req.ExtFields["topic"],
				rlog.LogKeyConsumerGroup: req.ExtFields["group"],
				rlog.LogKeyTimeStamp:     req.ExtFields["timestamp"],
			})

			header := new(ReplyMessageRequestHeader)
			header.Decode(req.ExtFields)

			var msgExt primitive.MessageExt
			msgExt.Topic = header.topic
			msgExt.Queue = &primitive.MessageQueue{
				QueueId: header.queueId,
				Topic:   header.topic,
			}
			msgExt.StoreTimestamp = header.storeTimestamp
			msgExt.BornHost = header.bornHost
			msgExt.StoreHost = header.storeHost

			body := req.Body
			if (header.sysFlag & primitive.FlagCompressed) == primitive.FlagCompressed {
				body = utils.UnCompress(req.Body)
			}
			msgExt.Body = body
			msgExt.Flag = header.flag
			msgExt.UnmarshalProperties([]byte(header.properties))
			msgExt.WithProperty(primitive.PropertyReplyMessageArriveTime, strconv.FormatInt(receiveTime, 10))
			msgExt.BornTimestamp = header.bornTimestamp
			msgExt.ReconsumeTimes = header.reconsumeTimes

			client.getReplyMessageRequest(&msgExt, header.bornHost)

			res := remote.NewRemotingCommand(ResError, nil, nil)
			res.Code = ResSuccess
			return res
		})

		client.remoteClient.RegisterRequestFunc(ReqGetConsumerStatsFromClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive get consumer status from client request...", map[string]interface{}{
				rlog.LogKeyBroker:        addr.String(),
				rlog.LogKeyTopic:         req.ExtFields["topic"],
				rlog.LogKeyConsumerGroup: req.ExtFields["group"],
			})

			header := new(GetConsumerStatusRequestHeader)
			header.Decode(req.ExtFields)
			res := remote.NewRemotingCommand(ResError, nil, nil)

			consumerStatus := client.getConsumerStatus(header.topic, header.group)
			if consumerStatus != nil {
				res.Code = ResSuccess
				data, err := consumerStatus.Encode()
				if err != nil {
					res.Remark = fmt.Sprintf("Failed to encode consumer status: %s", err.Error())
				} else {
					res.Body = data
				}
			} else {
				res.Remark = "there is unexpected error when get consumer status, please check log"
			}
			return res
		})
	}
	// bundle this client to namesrv
	client.GetNameSrv().(*namesrvs).bundleClient = client
	return client
}