in internal/client.go [608:688]
func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
c.hbMutex.Lock()
defer c.hbMutex.Unlock()
hbData := NewHeartbeatData(c.ClientID())
c.producerMap.Range(func(key, value interface{}) bool {
pData := producerData{
GroupName: key.(string),
}
hbData.ProducerDatas.Add(pData)
return true
})
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
cData := consumerData{
GroupName: key.(string),
CType: consumeType(consumer.GetcType()),
MessageModel: strings.ToUpper(consumer.GetModel()),
Where: consumer.GetWhere(),
UnitMode: consumer.IsUnitMode(),
SubscriptionDatas: consumer.SubscriptionDataList(),
}
hbData.ConsumerDatas.Add(cData)
return true
})
if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
rlog.Info("sending heartbeat, but no producer and no consumer", map[string]interface{}{
"clientId": hbData.ClientId,
})
return
}
c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value interface{}) bool {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
rlog.Debug("try to send heart beat to broker", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
if hbData.ConsumerDatas.Len() == 0 && id != 0 {
rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
continue
}
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
if err != nil {
cancel()
rlog.Warning("send heart beat to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return true
}
cancel()
if response.Code == ResSuccess {
c.GetNameSrv().(*namesrvs).AddBrokerVersion(brokerName, addr, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
} else {
rlog.Warning("send heart beat to broker failed", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
"responseCode": response.Code,
"remark": response.Remark,
})
}
}
return true
})
}