func()

in internal/client.go [608:688]


func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
	c.hbMutex.Lock()
	defer c.hbMutex.Unlock()
	hbData := NewHeartbeatData(c.ClientID())

	c.producerMap.Range(func(key, value interface{}) bool {
		pData := producerData{
			GroupName: key.(string),
		}
		hbData.ProducerDatas.Add(pData)
		return true
	})

	c.consumerMap.Range(func(key, value interface{}) bool {
		consumer := value.(InnerConsumer)
		cData := consumerData{
			GroupName:         key.(string),
			CType:             consumeType(consumer.GetcType()),
			MessageModel:      strings.ToUpper(consumer.GetModel()),
			Where:             consumer.GetWhere(),
			UnitMode:          consumer.IsUnitMode(),
			SubscriptionDatas: consumer.SubscriptionDataList(),
		}
		hbData.ConsumerDatas.Add(cData)
		return true
	})
	if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
		rlog.Info("sending heartbeat, but no producer and no consumer", map[string]interface{}{
			"clientId": hbData.ClientId,
		})
		return
	}
	c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value interface{}) bool {
		brokerName := key.(string)
		data := value.(*BrokerData)
		for id, addr := range data.BrokerAddresses {
			rlog.Debug("try to send heart beat to broker", map[string]interface{}{
				"brokerName": brokerName,
				"brokerId":   id,
				"brokerAddr": addr,
			})
			if hbData.ConsumerDatas.Len() == 0 && id != 0 {
				rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
					"brokerName": brokerName,
					"brokerId":   id,
					"brokerAddr": addr,
				})
				continue
			}
			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())

			ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
			response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
			if err != nil {
				cancel()
				rlog.Warning("send heart beat to broker error", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
				})
				return true
			}
			cancel()
			if response.Code == ResSuccess {
				c.GetNameSrv().(*namesrvs).AddBrokerVersion(brokerName, addr, int32(response.Version))
				rlog.Debug("send heart beat to broker success", map[string]interface{}{
					"brokerName": brokerName,
					"brokerId":   id,
					"brokerAddr": addr,
				})
			} else {
				rlog.Warning("send heart beat to broker failed", map[string]interface{}{
					"brokerName":   brokerName,
					"brokerId":     id,
					"brokerAddr":   addr,
					"responseCode": response.Code,
					"remark":       response.Remark,
				})
			}
		}
		return true
	})
}