in src/qmf/ConsoleSession.cpp [468:589]
void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
{
Variant::Map::const_iterator iter;
Agent agent;
uint32_t epoch(0);
string cid(msg.getCorrelationId());
iter = content.find("_values");
if (iter == content.end())
return;
const Variant::Map& in_attrs(iter->second.asMap());
Variant::Map attrs;
//
// Copy the map from the message to "attrs". Translate any old-style
// keys to their new key values in the process.
//
for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
if (iter->first == "epoch")
attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
else if (iter->first == "timestamp")
attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
else if (iter->first == "heartbeat_interval")
attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
else
attrs[iter->first] = iter->second;
}
iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
epoch = iter->second.asUint32();
if (cid == "broker-locate") {
qpid::sys::Mutex::ScopedLock l(lock);
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
if (iter->first != protocol::AGENT_ATTR_EPOCH)
impl->setAttribute(iter->first, iter->second);
agent = Agent(impl.release());
connectedBrokerAgent = agent;
if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
connectedBrokerInAgentList = true;
agents[agentName] = agent;
//
// Enqueue a notification of the new agent.
//
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
return;
}
//
// Check this agent against the agent filter. Exit if it doesn't match.
// (only if this isn't the connected broker agent)
//
if (agentQuery && (!agentQuery.matchesPredicate(attrs)))
return;
QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);
{
qpid::sys::Mutex::ScopedLock l(lock);
map<string, Agent>::iterator aIter = agents.find(agentName);
if (aIter == agents.end()) {
//
// This is a new agent. We have no current record of its existence.
//
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
if (iter->first != protocol::AGENT_ATTR_EPOCH)
impl->setAttribute(iter->first, iter->second);
agent = Agent(impl.release());
agents[agentName] = agent;
//
// Enqueue a notification of the new agent.
//
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
} else {
//
// This is a refresh of an agent we are already tracking.
//
bool detectedRestart(false);
agent = aIter->second;
AgentImpl& impl(AgentImplAccess::get(agent));
impl.touch();
if (impl.getEpoch() != epoch) {
//
// The agent has restarted since the last time we heard from it.
// Enqueue a notification.
//
impl.setEpoch(epoch);
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
detectedRestart = true;
}
iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
if (iter != attrs.end()) {
uint64_t ts(iter->second.asUint64());
if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
//
// The agent has added new schema entries since we last heard from it.
// Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
//
if (!detectedRestart) {
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
}
}
}
}
}