void ManagementAgent::handleGetQuery()

in src/qpid/management/ManagementAgent.cpp [1920:2116]


void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, const std::string& userId, bool viaLocal)
{
    moveNewObjects();

    Variant::Map inMap;
    Variant::Map::const_iterator i;
    Variant::Map headers;
    AclModule* acl = broker->getAcl();

    MapCodec::decode(body, inMap);
    QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);

    headers["method"] = "response";
    headers["qmf.opcode"] = "_query_response";
    headers["qmf.content"] = "_data";
    headers["qmf.agent"] = viaLocal ? "broker" : name_address;

    /*
     * Unpack the _what element of the query.  Currently we only support OBJECT queries.
     */
    i = inMap.find("_what");
    if (i == inMap.end()) {
        sendException(rte, rtk, cid, "_what element missing in Query");
        return;
    }

    if (i->second.getType() != qpid::types::VAR_STRING) {
        sendException(rte, rtk, cid, "_what element is not a string");
        return;
    }

    if (i->second.asString() != "OBJECT") {
        sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
        return;
    }

    string className;
    string packageName;

    /*
     * Handle the _schema_id element, if supplied.
     */
    i = inMap.find("_schema_id");
    if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
        const Variant::Map& schemaIdMap(i->second.asMap());

        Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
            className = s_iter->second.asString();

        s_iter = schemaIdMap.find("_package_name");
        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
            packageName = s_iter->second.asString();
    }

    if (className == "memory")
        qpid::sys::MemStat::loadMemInfo(memstat.get());

    if (className == "broker") {
        uint64_t uptime = sys::Duration(startTime, sys::now());
        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
    }

    /*
     * Unpack the _object_id element of the query if it is present.  If it is present, find that one
     * object and return it.  If it is not present, send a class-based result.
     */
    i = inMap.find("_object_id");
    if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
        Variant::List list_;
        ObjectId objId(i->second.asMap());

        ManagementObject::shared_ptr object;
        {
            sys::Mutex::ScopedLock lock (objectLock);
            ManagementObjectMap::iterator iter = managementObjects.find(objId);
            if (iter != managementObjects.end())
                object = iter->second;
        }
        if (object) {
            if (acl != 0) {
                map<acl::Property, string> params;
                params[acl::PROP_SCHEMACLASS]   = object->getClassName();

                if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), &params)) {
		    throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
                }
            }
            if (object->getConfigChanged() || object->getInstChanged())
                object->setUpdateTime();

            if (!object->isDeleted()) {
                Variant::Map  map_;
                Variant::Map values;
                Variant::Map oidMap;

                object->writeTimestamps(map_);
                object->mapEncodeValues(values, true, true); // write both stats and properties
                objId.mapEncode(oidMap);
                map_["_values"] = values;
                map_["_object_id"] = oidMap;
                map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
                                                       object->getClassName(),
                                                       "_data",
                                                       object->getMd5Sum());
                list_.push_back(map_);
            }

            string content;

            ListCodec::encode(list_, content);
            sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
            return;
        }
    } else {
        // send class-based result.
        if (acl != 0) {
            map<acl::Property, string> params;
            params[acl::PROP_SCHEMACLASS]   = className;

            if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, &params)) {
                throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
            }
        }
        Variant::List _list;
        Variant::List _subList;
        unsigned int objCount = 0;

        ManagementObjectVector localManagementObjects;
        {
            sys::Mutex::ScopedLock objLock(objectLock);
            std::transform(managementObjects.begin(), managementObjects.end(),
                           std::back_inserter(localManagementObjects),
                           boost::bind(&ManagementObjectMap::value_type::second, _1));
        }

        for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
             iter != localManagementObjects.end();
             iter++) {
            ManagementObject::shared_ptr object = *iter;
            if (object->getClassName() == className &&
                (packageName.empty() || object->getPackageName() == packageName)) {


                if (!object->isDeleted()) {
                    Variant::Map  map_;
                    Variant::Map values;
                    Variant::Map oidMap;

                    if (object->getConfigChanged() || object->getInstChanged())
                        object->setUpdateTime();

                    object->writeTimestamps(map_);
                    object->mapEncodeValues(values, true, true); // write both stats and properties
                    object->getObjectId().mapEncode(oidMap);

                    map_["_values"] = values;
                    map_["_object_id"] = oidMap;
                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
                                                           object->getClassName(),
                                                           "_data",
                                                           object->getMd5Sum());
                    _subList.push_back(map_);
                    if (++objCount >= maxReplyObjs) {
                        objCount = 0;
                        _list.push_back(_subList);
                        _subList.clear();
                    }
                }
            }
        }

        if (_subList.size())
            _list.push_back(_subList);

        headers["partial"] = Variant();
        string content;
        while (_list.size() > 1) {
            ListCodec::encode(_list.front().asList(), content);
            sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
            _list.pop_front();
            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
        }
        headers.erase("partial");
        ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
        sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
        return;
    }

    // Unrecognized query - Send empty message to indicate CommandComplete
    string content;
    ListCodec::encode(Variant::List(), content);
    sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}