void ManagementAgent::periodicProcessing()

in src/qpid/management/ManagementAgent.cpp [702:1082]


void ManagementAgent::periodicProcessing (void)
{
#define HEADROOM  4096
    sys::Mutex::ScopedLock lock (userLock);
    debugSnapshot("Management agent periodic processing");
    string              routingKey;
    string sBuf;

    moveNewObjects();

    //
    //  If we're publishing updates, get the latest memory statistics and uptime now
    //
    if (publish) {
        uint64_t uptime = sys::Duration(startTime, sys::now());
        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
        qpid::sys::MemStat::loadMemInfo(memstat.get());
    }

    //
    //  Use a copy of the management object map to avoid holding the objectLock
    //
    ManagementObjectVector localManagementObjects;
    {
        sys::Mutex::ScopedLock objLock(objectLock);
        std::transform(managementObjects.begin(), managementObjects.end(),
                       std::back_inserter(localManagementObjects),
                       boost::bind(&ManagementObjectMap::value_type::second, _1));
    }

    //
    //  Clear the been-here flag on all objects in the map.
    //
    for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
         iter != localManagementObjects.end();
         iter++) {
        ManagementObject::shared_ptr object = *iter;
        object->setFlags(0);
        if (clientWasAdded) {
            object->setForcePublish(true);
        }
    }

    clientWasAdded = false;

    // first send the pending deletes before sending updates.  This prevents a
    // "false delete" scenario: if an object was deleted then re-added during
    // the last poll cycle, it will have a delete entry and an active entry.
    // if we sent the active update first, _then_ the delete update, clients
    // would incorrectly think the object was deleted.  See QPID-2997
    //
    bool objectsDeleted = moveDeletedObjects();
    PendingDeletedObjsMap localPendingDeletedObjs;
    {
        sys::Mutex::ScopedLock objLock(objectLock);
        localPendingDeletedObjs.swap(pendingDeletedObjs);
    }

    //
    // If we are not publishing updates, just clear the pending deletes.  There's no
    // need to tell anybody.
    //
    if (!publish)
        localPendingDeletedObjs.clear();

    ResizableBuffer msgBuffer(qmfV1BufferSize);
    if (!localPendingDeletedObjs.empty()) {
        for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin();
             mIter != localPendingDeletedObjs.end();
             mIter++) {
            std::string packageName;
            std::string className;
            msgBuffer.reset();
            uint32_t v1Objs = 0;
            uint32_t v2Objs = 0;
            Variant::List list_;

            size_t pos = mIter->first.find(":");
            packageName = mIter->first.substr(0, pos);
            className = mIter->first.substr(pos+1);

            for (DeletedObjectList::iterator lIter = mIter->second.begin();
                 lIter != mIter->second.end(); lIter++) {
                msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
                std::string oid = (*lIter)->objectId;
                if (!(*lIter)->encodedV1Config.empty()) {
                    encodeHeader(msgBuffer, 'c');
                    msgBuffer.putRawData((*lIter)->encodedV1Config);
                    QPID_LOG(trace, "Deleting V1 properties " << oid
                             << " len=" << (*lIter)->encodedV1Config.size());
                    v1Objs++;
                }
                if (!(*lIter)->encodedV1Inst.empty()) {
                    encodeHeader(msgBuffer, 'i');
                    msgBuffer.putRawData((*lIter)->encodedV1Inst);
                    QPID_LOG(trace, "Deleting V1 statistics " << oid
                             << " len=" <<  (*lIter)->encodedV1Inst.size());
                    v1Objs++;
                }
                if (v1Objs >= maxReplyObjs) {
                    v1Objs = 0;
                    stringstream key;
                    key << "console.obj.1.0." << packageName << "." << className;
                    size_t contentSize = msgBuffer.getPosition();
                    sendBuffer(msgBuffer, mExchange, key.str());
                    QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
                             << key.str() << " len=" << contentSize);
                }

                if (!(*lIter)->encodedV2.empty()) {
                    QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
                    list_.push_back((*lIter)->encodedV2);
                    if (++v2Objs >= maxReplyObjs) {
                        v2Objs = 0;

                        string content;
                        ListCodec::encode(list_, content);
                        list_.clear();
                        if (content.length()) {
                            stringstream key;
                            Variant::Map  headers;
                            key << "agent.ind.data." << keyifyNameStr(packageName)
                                << "." << keyifyNameStr(className)
                                << "." << vendorNameKey
                                << "." << productNameKey;
                            if (!instanceNameKey.empty())
                                key << "." << instanceNameKey;

                            headers["method"] = "indication";
                            headers["qmf.opcode"] = "_data_indication";
                            headers["qmf.content"] = "_data";
                            headers["qmf.agent"] = name_address;

                            sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
                            QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
                        }
                    }
                }
            }  // end current list

            // send any remaining objects...

            if (v1Objs) {
                stringstream key;
                key << "console.obj.1.0." << packageName << "." << className;
                size_t contentSize = msgBuffer.getPosition();
                sendBuffer(msgBuffer, mExchange, key.str());
                QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
            }

            if (!list_.empty()) {
                string content;
                ListCodec::encode(list_, content);
                list_.clear();
                if (content.length()) {
                    stringstream key;
                    Variant::Map  headers;
                    key << "agent.ind.data." << keyifyNameStr(packageName)
                        << "." << keyifyNameStr(className)
                        << "." << vendorNameKey
                        << "." << productNameKey;
                    if (!instanceNameKey.empty())
                        key << "." << instanceNameKey;

                    headers["method"] = "indication";
                    headers["qmf.opcode"] = "_data_indication";
                    headers["qmf.content"] = "_data";
                    headers["qmf.agent"] = name_address;

                    sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
                    QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
                }
            }
        }  // end map
    }

    //
    // Process the entire object map.
    //
    // If publish is disabled, don't send any updates.
    //
    while (publish) {
        msgBuffer.reset();
        Variant::List list_;
        uint32_t pcount;
        uint32_t scount;
        uint32_t v1Objs, v2Objs;
        ManagementObjectVector::iterator baseIter;
        std::string packageName;
        std::string className;

        for (baseIter = localManagementObjects.begin();
             baseIter != localManagementObjects.end();
             baseIter++) {
            ManagementObject::shared_ptr baseObject = *baseIter;
            //
            //  Skip until we find a base object requiring processing...
            //
            if (baseObject->getFlags() == 0) {
                packageName = baseObject->getPackageName();
                className = baseObject->getClassName();
                break;
            }
        }

        if (baseIter == localManagementObjects.end())
            break;  // done - all objects processed

        pcount = scount = 0;
        v1Objs = 0;
        v2Objs = 0;
        list_.clear();
        msgBuffer.reset();

        for (ManagementObjectVector::iterator iter = baseIter;
             iter != localManagementObjects.end();
             iter++) {
            msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
            ManagementObject::shared_ptr baseObject = *baseIter;
            ManagementObject::shared_ptr object = *iter;
            bool send_stats, send_props;
            if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
                object->setFlags(1);
                if (object->getConfigChanged() || object->getInstChanged())
                    object->setUpdateTime();

                // skip any objects marked deleted since our first pass.  Deal with them
                // on the next periodic cycle...
                if (object->isDeleted()) {
                    continue;
                }

                send_props = (object->getConfigChanged() || object->getForcePublish());
                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));

                if (send_props && qmf1Support) {
                    size_t pos = msgBuffer.getPosition();
                    encodeHeader(msgBuffer, 'c');
                    sBuf.clear();
                    object->writeProperties(sBuf);
                    msgBuffer.putRawData(sBuf);
                    QPID_LOG(trace, "Changed V1 properties "
                             << object->getObjectId().getV2Key()
                             << " len=" << msgBuffer.getPosition()-pos);
                    ++v1Objs;
                }

                if (send_stats && qmf1Support) {
                    size_t pos = msgBuffer.getPosition();
                    encodeHeader(msgBuffer, 'i');
                    sBuf.clear();
                    object->writeStatistics(sBuf);
                    msgBuffer.putRawData(sBuf);
                    QPID_LOG(trace, "Changed V1 statistics "
                             << object->getObjectId().getV2Key()
                             << " len=" << msgBuffer.getPosition()-pos);
                    ++v1Objs;
                }

                if ((send_stats || send_props) && qmf2Support) {
                    Variant::Map  map_;
                    Variant::Map values;
                    Variant::Map oid;

                    object->getObjectId().mapEncode(oid);
                    map_["_object_id"] = oid;
                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
                                                           object->getClassName(),
                                                           "_data",
                                                           object->getMd5Sum());
                    object->writeTimestamps(map_);
                    object->mapEncodeValues(values, send_props, send_stats);
                    map_["_values"] = values;
                    list_.push_back(map_);
                    v2Objs++;
                    QPID_LOG(trace, "Changed V2"
                             << (send_stats? " statistics":"")
                             << (send_props? " properties":"")
                             << " map=" << map_);
                }

                if (send_props) pcount++;
                if (send_stats) scount++;

                object->setForcePublish(false);

                if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
                    (qmf2Support && (v2Objs >= maxReplyObjs)))
                    break;  // have enough objects, send an indication...
            }
        }

        if (pcount || scount) {
            if (qmf1Support) {
                if (msgBuffer.getPosition() > 0) {
                    stringstream key;
                    key << "console.obj.1.0." << packageName << "." << className;
                    size_t contentSize = msgBuffer.getPosition();
                    sendBuffer(msgBuffer, mExchange, key.str());
                    QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
                             << " props=" << pcount
                             << " stats=" << scount
                             << " len=" << contentSize);
                }
            }

            if (qmf2Support) {
                string content;
                ListCodec::encode(list_, content);
                if (content.length()) {
                    stringstream key;
                    Variant::Map  headers;
                    key << "agent.ind.data." << keyifyNameStr(packageName)
                        << "." << keyifyNameStr(className)
                        << "." << vendorNameKey
                        << "." << productNameKey;
                    if (!instanceNameKey.empty())
                        key << "." << instanceNameKey;

                    headers["method"] = "indication";
                    headers["qmf.opcode"] = "_data_indication";
                    headers["qmf.content"] = "_data";
                    headers["qmf.agent"] = name_address;

                    sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
                    QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
                             << " props=" << pcount
                             << " stats=" << scount
                             << " len=" << content.length());
                }
            }
        }
    }  // end processing updates for all objects

    if (objectsDeleted) {
        sys::Mutex::ScopedLock lock (userLock);
        deleteOrphanedAgentsLH();
    }

    // heartbeat generation.  Note that heartbeats need to be sent even if publish is disabled.

    if (qmf1Support) {
        char                msgChars[qmfV1BufferSize];
        Buffer msgBuffer(msgChars, qmfV1BufferSize);
        encodeHeader(msgBuffer, 'h');
        msgBuffer.putLongLong(sys::Duration::FromEpoch());

        routingKey = "console.heartbeat.1.0";
        sendBuffer(msgBuffer, mExchange, routingKey);
        QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
    }

    if (qmf2Support) {
        std::stringstream addr_key;

        addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey;
        if (!instanceNameKey.empty())
            addr_key << "." << instanceNameKey;

        Variant::Map map;
        Variant::Map headers;

        headers["method"] = "indication";
        headers["qmf.opcode"] = "_agent_heartbeat_indication";
        headers["qmf.agent"] = name_address;

        map["_values"] = attrMap;
        map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch());
        map["_values"].asMap()["_heartbeat_interval"] = interval;
        map["_values"].asMap()["_epoch"] = bootSequence;

        string content;
        MapCodec::encode(map, content);

        // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
        // time to prevent stale heartbeats from getting to the consoles.
        sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);

        QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
    }
}