void ConsoleSessionImpl::handleAgentUpdate()

in src/qmf/ConsoleSession.cpp [468:589]


void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
{
    Variant::Map::const_iterator iter;
    Agent agent;
    uint32_t epoch(0);
    string cid(msg.getCorrelationId());

    iter = content.find("_values");
    if (iter == content.end())
        return;
    const Variant::Map& in_attrs(iter->second.asMap());
    Variant::Map attrs;

    //
    // Copy the map from the message to "attrs".  Translate any old-style
    // keys to their new key values in the process.
    //
    for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
        if      (iter->first == "epoch")
            attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
        else if (iter->first == "timestamp")
            attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
        else if (iter->first == "heartbeat_interval")
            attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
        else
            attrs[iter->first] = iter->second;
    }

    iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
    if (iter != attrs.end())
        epoch = iter->second.asUint32();

    if (cid == "broker-locate") {
        qpid::sys::Mutex::ScopedLock l(lock);
        auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
        for (iter = attrs.begin(); iter != attrs.end(); iter++)
            if (iter->first != protocol::AGENT_ATTR_EPOCH)
                impl->setAttribute(iter->first, iter->second);
        agent = Agent(impl.release());
        connectedBrokerAgent = agent;
        if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
            connectedBrokerInAgentList = true;
            agents[agentName] = agent;

            //
            // Enqueue a notification of the new agent.
            //
            auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
            eventImpl->setAgent(agent);
            enqueueEventLH(ConsoleEvent(eventImpl.release()));
        }
        return;
    }

    //
    // Check this agent against the agent filter.  Exit if it doesn't match.
    // (only if this isn't the connected broker agent)
    //
    if (agentQuery && (!agentQuery.matchesPredicate(attrs)))
        return;

    QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);

    {
        qpid::sys::Mutex::ScopedLock l(lock);
        map<string, Agent>::iterator aIter = agents.find(agentName);
        if (aIter == agents.end()) {
            //
            // This is a new agent.  We have no current record of its existence.
            //
            auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
            for (iter = attrs.begin(); iter != attrs.end(); iter++)
                if (iter->first != protocol::AGENT_ATTR_EPOCH)
                    impl->setAttribute(iter->first, iter->second);
            agent = Agent(impl.release());
            agents[agentName] = agent;

            //
            // Enqueue a notification of the new agent.
            //
            auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
            eventImpl->setAgent(agent);
            enqueueEventLH(ConsoleEvent(eventImpl.release()));
        } else {
            //
            // This is a refresh of an agent we are already tracking.
            //
            bool detectedRestart(false);
            agent = aIter->second;
            AgentImpl& impl(AgentImplAccess::get(agent));
            impl.touch();
            if (impl.getEpoch() != epoch) {
                //
                // The agent has restarted since the last time we heard from it.
                // Enqueue a notification.
                //
                impl.setEpoch(epoch);
                auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
                eventImpl->setAgent(agent);
                enqueueEventLH(ConsoleEvent(eventImpl.release()));
                detectedRestart = true;
            }

            iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
            if (iter != attrs.end()) {
                uint64_t ts(iter->second.asUint64());
                if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
                    //
                    // The agent has added new schema entries since we last heard from it.
                    // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
                    //
                    if (!detectedRestart) {
                        auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
                        eventImpl->setAgent(agent);
                        enqueueEventLH(ConsoleEvent(eventImpl.release()));
                    }
                    impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
                }
            }
        }
    }
}