in src/MQClientFactory.cpp [769:803]
void MQClientFactory::sendHeartbeatToAllBroker() {
BrokerAddrMAP brokerTable(getBrokerAddrMap());
if (brokerTable.size() == 0) {
LOG_WARN("sendheartbeat brokeradd is empty");
return;
}
unique_ptr<HeartbeatData> heartbeatData(prepareHeartbeatData());
bool producerEmpty = heartbeatData->isProducerDataSetEmpty();
bool consumerEmpty = heartbeatData->isConsumerDataSetEmpty();
if (producerEmpty && consumerEmpty) {
LOG_WARN("sendheartbeat heartbeatData empty");
brokerTable.clear();
return;
}
SessionCredentials session_credentials;
getSessionCredentialsFromOneOfProducerOrConsumer(session_credentials);
for (BrokerAddrMAP::iterator it = brokerTable.begin(); it != brokerTable.end(); ++it) {
map<int, string> brokerMap(it->second);
map<int, string>::iterator it1 = brokerMap.begin();
for (; it1 != brokerMap.end(); ++it1) {
string& addr = it1->second;
if (consumerEmpty && it1->first != MASTER_ID)
continue;
try {
m_pClientAPIImpl->sendHeartbeat(addr, heartbeatData.get(), session_credentials);
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
}
}
}
brokerTable.clear();
}