in src/qpid/management/ManagementAgent.cpp [1920:2116]
void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, const std::string& userId, bool viaLocal)
{
moveNewObjects();
Variant::Map inMap;
Variant::Map::const_iterator i;
Variant::Map headers;
AclModule* acl = broker->getAcl();
MapCodec::decode(body, inMap);
QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = viaLocal ? "broker" : name_address;
/*
* Unpack the _what element of the query. Currently we only support OBJECT queries.
*/
i = inMap.find("_what");
if (i == inMap.end()) {
sendException(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
sendException(rte, rtk, cid, "_what element is not a string");
return;
}
if (i->second.asString() != "OBJECT") {
sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
return;
}
string className;
string packageName;
/*
* Handle the _schema_id element, if supplied.
*/
i = inMap.find("_schema_id");
if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
const Variant::Map& schemaIdMap(i->second.asMap());
Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
className = s_iter->second.asString();
s_iter = schemaIdMap.find("_package_name");
if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
packageName = s_iter->second.asString();
}
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat.get());
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
/*
* Unpack the _object_id element of the query if it is present. If it is present, find that one
* object and return it. If it is not present, send a class-based result.
*/
i = inMap.find("_object_id");
if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
Variant::List list_;
ObjectId objId(i->second.asMap());
ManagementObject::shared_ptr object;
{
sys::Mutex::ScopedLock lock (objectLock);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter != managementObjects.end())
object = iter->second;
}
if (object) {
if (acl != 0) {
map<acl::Property, string> params;
params[acl::PROP_SCHEMACLASS] = object->getClassName();
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), ¶ms)) {
throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
}
}
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
if (!object->isDeleted()) {
Variant::Map map_;
Variant::Map values;
Variant::Map oidMap;
object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
objId.mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
"_data",
object->getMd5Sum());
list_.push_back(map_);
}
string content;
ListCodec::encode(list_, content);
sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
} else {
// send class-based result.
if (acl != 0) {
map<acl::Property, string> params;
params[acl::PROP_SCHEMACLASS] = className;
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, ¶ms)) {
throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
}
}
Variant::List _list;
Variant::List _subList;
unsigned int objCount = 0;
ManagementObjectVector localManagementObjects;
{
sys::Mutex::ScopedLock objLock(objectLock);
std::transform(managementObjects.begin(), managementObjects.end(),
std::back_inserter(localManagementObjects),
boost::bind(&ManagementObjectMap::value_type::second, _1));
}
for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
iter != localManagementObjects.end();
iter++) {
ManagementObject::shared_ptr object = *iter;
if (object->getClassName() == className &&
(packageName.empty() || object->getPackageName() == packageName)) {
if (!object->isDeleted()) {
Variant::Map map_;
Variant::Map values;
Variant::Map oidMap;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
object->getObjectId().mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
"_data",
object->getMd5Sum());
_subList.push_back(map_);
if (++objCount >= maxReplyObjs) {
objCount = 0;
_list.push_back(_subList);
_subList.clear();
}
}
}
}
if (_subList.size())
_list.push_back(_subList);
headers["partial"] = Variant();
string content;
while (_list.size() > 1) {
ListCodec::encode(_list.front().asList(), content);
sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
_list.pop_front();
QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
return;
}
// Unrecognized query - Send empty message to indicate CommandComplete
string content;
ListCodec::encode(Variant::List(), content);
sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}