in internal/client.go [195:411]
func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(option.RemotingClientConfig),
done: make(chan struct{}),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
if loaded {
// compare namesrv address
client = actual.(*rmqClient)
now := option.Namesrv.(*namesrvs).resolver.Resolve()
old := client.GetNameSrv().(*namesrvs).resolver.Resolve()
if len(now) != len(old) {
rlog.Error("different namesrv option in the same instance", map[string]interface{}{
"NewNameSrv": now,
"BeforeNameSrv": old,
})
return nil
}
sort.Strings(now)
sort.Strings(old)
for i := 0; i < len(now); i++ {
if now[i] != old[i] {
rlog.Error("different namesrv option in the same instance", map[string]interface{}{
"NewNameSrv": now,
"BeforeNameSrv": old,
})
return nil
}
}
} else {
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
})
client.RebalanceIfNotPaused()
return nil
})
client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
header := new(CheckTransactionStateRequestHeader)
header.Decode(req.ExtFields)
msgExts := primitive.DecodeMessage(req.Body)
if len(msgExts) == 0 {
rlog.Warning("checkTransactionState, decode message failed", nil)
return nil
}
msgExt := msgExts[0]
// TODO: add namespace support
transactionID := msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
if len(transactionID) > 0 {
msgExt.TransactionId = transactionID
}
group := msgExt.GetProperty(primitive.PropertyProducerGroup)
if group == "" {
rlog.Warning("checkTransactionState, pick producer group failed", nil)
return nil
}
if option.GroupName != group {
rlog.Warning("producer group is not equal", nil)
return nil
}
callback := &CheckTransactionStateCallback{
Addr: addr,
Msg: msgExt,
Header: *header,
}
callbackCh <- callback
return nil
})
client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive get consumer running info request...", nil)
header := new(GetConsumerRunningInfoHeader)
header.Decode(req.ExtFields)
val, exist := clientMap.Load(header.clientID)
res := remote.NewRemotingCommand(ResError, nil, nil)
if !exist {
res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
} else {
cli, ok := val.(*rmqClient)
var runningInfo *ConsumerRunningInfo
if ok {
runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable)
}
if runningInfo != nil {
res.Code = ResSuccess
data, err := runningInfo.Encode()
if err != nil {
res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
} else {
res.Body = data
}
} else {
res.Remark = "there is unexpected error when get running info, please check log"
}
}
return res
})
client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive consume message directly request...", nil)
header := new(ConsumeMessageDirectlyHeader)
header.Decode(req.ExtFields)
val, exist := clientMap.Load(header.clientID)
res := remote.NewRemotingCommand(ResError, nil, nil)
if !exist {
res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
} else {
cli, ok := val.(*rmqClient)
msg := primitive.DecodeMessage(req.Body)[0]
var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult
if ok {
consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
}
if consumeMessageDirectlyResult != nil {
res.Code = ResSuccess
data, err := consumeMessageDirectlyResult.Encode()
if err != nil {
res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
} else {
res.Body = data
}
} else {
res.Remark = "there is unexpected error when consume message directly, please check log"
}
}
return res
})
client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive reset consumer offset request...", map[string]interface{}{
rlog.LogKeyBroker: addr.String(),
rlog.LogKeyTopic: req.ExtFields["topic"],
rlog.LogKeyConsumerGroup: req.ExtFields["group"],
rlog.LogKeyTimeStamp: req.ExtFields["timestamp"],
})
header := new(ResetOffsetHeader)
header.Decode(req.ExtFields)
body := new(ResetOffsetBody)
body.Decode(req.Body)
client.resetOffset(header.topic, header.group, body.OffsetTable)
return nil
})
client.remoteClient.RegisterRequestFunc(ReqPushReplyMessageToClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
receiveTime := time.Now().UnixNano() / int64(time.Millisecond)
rlog.Info("receive push reply to client request...", map[string]interface{}{
rlog.LogKeyBroker: addr.String(),
rlog.LogKeyTopic: req.ExtFields["topic"],
rlog.LogKeyConsumerGroup: req.ExtFields["group"],
rlog.LogKeyTimeStamp: req.ExtFields["timestamp"],
})
header := new(ReplyMessageRequestHeader)
header.Decode(req.ExtFields)
var msgExt primitive.MessageExt
msgExt.Topic = header.topic
msgExt.Queue = &primitive.MessageQueue{
QueueId: header.queueId,
Topic: header.topic,
}
msgExt.StoreTimestamp = header.storeTimestamp
msgExt.BornHost = header.bornHost
msgExt.StoreHost = header.storeHost
body := req.Body
if (header.sysFlag & primitive.FlagCompressed) == primitive.FlagCompressed {
body = utils.UnCompress(req.Body)
}
msgExt.Body = body
msgExt.Flag = header.flag
msgExt.UnmarshalProperties([]byte(header.properties))
msgExt.WithProperty(primitive.PropertyReplyMessageArriveTime, strconv.FormatInt(receiveTime, 10))
msgExt.BornTimestamp = header.bornTimestamp
msgExt.ReconsumeTimes = header.reconsumeTimes
client.getReplyMessageRequest(&msgExt, header.bornHost)
res := remote.NewRemotingCommand(ResError, nil, nil)
res.Code = ResSuccess
return res
})
client.remoteClient.RegisterRequestFunc(ReqGetConsumerStatsFromClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive get consumer status from client request...", map[string]interface{}{
rlog.LogKeyBroker: addr.String(),
rlog.LogKeyTopic: req.ExtFields["topic"],
rlog.LogKeyConsumerGroup: req.ExtFields["group"],
})
header := new(GetConsumerStatusRequestHeader)
header.Decode(req.ExtFields)
res := remote.NewRemotingCommand(ResError, nil, nil)
consumerStatus := client.getConsumerStatus(header.topic, header.group)
if consumerStatus != nil {
res.Code = ResSuccess
data, err := consumerStatus.Encode()
if err != nil {
res.Remark = fmt.Sprintf("Failed to encode consumer status: %s", err.Error())
} else {
res.Body = data
}
} else {
res.Remark = "there is unexpected error when get consumer status, please check log"
}
return res
})
}
// bundle this client to namesrv
client.GetNameSrv().(*namesrvs).bundleClient = client
return client
}