in src/qpid/management/ManagementAgent.cpp [702:1082]
void ManagementAgent::periodicProcessing (void)
{
#define HEADROOM 4096
sys::Mutex::ScopedLock lock (userLock);
debugSnapshot("Management agent periodic processing");
string routingKey;
string sBuf;
moveNewObjects();
//
// If we're publishing updates, get the latest memory statistics and uptime now
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
qpid::sys::MemStat::loadMemInfo(memstat.get());
}
//
// Use a copy of the management object map to avoid holding the objectLock
//
ManagementObjectVector localManagementObjects;
{
sys::Mutex::ScopedLock objLock(objectLock);
std::transform(managementObjects.begin(), managementObjects.end(),
std::back_inserter(localManagementObjects),
boost::bind(&ManagementObjectMap::value_type::second, _1));
}
//
// Clear the been-here flag on all objects in the map.
//
for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
iter != localManagementObjects.end();
iter++) {
ManagementObject::shared_ptr object = *iter;
object->setFlags(0);
if (clientWasAdded) {
object->setForcePublish(true);
}
}
clientWasAdded = false;
// first send the pending deletes before sending updates. This prevents a
// "false delete" scenario: if an object was deleted then re-added during
// the last poll cycle, it will have a delete entry and an active entry.
// if we sent the active update first, _then_ the delete update, clients
// would incorrectly think the object was deleted. See QPID-2997
//
bool objectsDeleted = moveDeletedObjects();
PendingDeletedObjsMap localPendingDeletedObjs;
{
sys::Mutex::ScopedLock objLock(objectLock);
localPendingDeletedObjs.swap(pendingDeletedObjs);
}
//
// If we are not publishing updates, just clear the pending deletes. There's no
// need to tell anybody.
//
if (!publish)
localPendingDeletedObjs.clear();
ResizableBuffer msgBuffer(qmfV1BufferSize);
if (!localPendingDeletedObjs.empty()) {
for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin();
mIter != localPendingDeletedObjs.end();
mIter++) {
std::string packageName;
std::string className;
msgBuffer.reset();
uint32_t v1Objs = 0;
uint32_t v2Objs = 0;
Variant::List list_;
size_t pos = mIter->first.find(":");
packageName = mIter->first.substr(0, pos);
className = mIter->first.substr(pos+1);
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
msgBuffer.putRawData((*lIter)->encodedV1Config);
QPID_LOG(trace, "Deleting V1 properties " << oid
<< " len=" << (*lIter)->encodedV1Config.size());
v1Objs++;
}
if (!(*lIter)->encodedV1Inst.empty()) {
encodeHeader(msgBuffer, 'i');
msgBuffer.putRawData((*lIter)->encodedV1Inst);
QPID_LOG(trace, "Deleting V1 statistics " << oid
<< " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
size_t contentSize = msgBuffer.getPosition();
sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
<< key.str() << " len=" << contentSize);
}
if (!(*lIter)->encodedV2.empty()) {
QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
if (++v2Objs >= maxReplyObjs) {
v2Objs = 0;
string content;
ListCodec::encode(list_, content);
list_.clear();
if (content.length()) {
stringstream key;
Variant::Map headers;
key << "agent.ind.data." << keyifyNameStr(packageName)
<< "." << keyifyNameStr(className)
<< "." << vendorNameKey
<< "." << productNameKey;
if (!instanceNameKey.empty())
key << "." << instanceNameKey;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
}
} // end current list
// send any remaining objects...
if (v1Objs) {
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
size_t contentSize = msgBuffer.getPosition();
sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
}
if (!list_.empty()) {
string content;
ListCodec::encode(list_, content);
list_.clear();
if (content.length()) {
stringstream key;
Variant::Map headers;
key << "agent.ind.data." << keyifyNameStr(packageName)
<< "." << keyifyNameStr(className)
<< "." << vendorNameKey
<< "." << productNameKey;
if (!instanceNameKey.empty())
key << "." << instanceNameKey;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
} // end map
}
//
// Process the entire object map.
//
// If publish is disabled, don't send any updates.
//
while (publish) {
msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
uint32_t scount;
uint32_t v1Objs, v2Objs;
ManagementObjectVector::iterator baseIter;
std::string packageName;
std::string className;
for (baseIter = localManagementObjects.begin();
baseIter != localManagementObjects.end();
baseIter++) {
ManagementObject::shared_ptr baseObject = *baseIter;
//
// Skip until we find a base object requiring processing...
//
if (baseObject->getFlags() == 0) {
packageName = baseObject->getPackageName();
className = baseObject->getClassName();
break;
}
}
if (baseIter == localManagementObjects.end())
break; // done - all objects processed
pcount = scount = 0;
v1Objs = 0;
v2Objs = 0;
list_.clear();
msgBuffer.reset();
for (ManagementObjectVector::iterator iter = baseIter;
iter != localManagementObjects.end();
iter++) {
msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
ManagementObject::shared_ptr baseObject = *baseIter;
ManagementObject::shared_ptr object = *iter;
bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
// skip any objects marked deleted since our first pass. Deal with them
// on the next periodic cycle...
if (object->isDeleted()) {
continue;
}
send_props = (object->getConfigChanged() || object->getForcePublish());
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_props && qmf1Support) {
size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'c');
sBuf.clear();
object->writeProperties(sBuf);
msgBuffer.putRawData(sBuf);
QPID_LOG(trace, "Changed V1 properties "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
++v1Objs;
}
if (send_stats && qmf1Support) {
size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'i');
sBuf.clear();
object->writeStatistics(sBuf);
msgBuffer.putRawData(sBuf);
QPID_LOG(trace, "Changed V1 statistics "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
++v1Objs;
}
if ((send_stats || send_props) && qmf2Support) {
Variant::Map map_;
Variant::Map values;
Variant::Map oid;
object->getObjectId().mapEncode(oid);
map_["_object_id"] = oid;
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
"_data",
object->getMd5Sum());
object->writeTimestamps(map_);
object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
list_.push_back(map_);
v2Objs++;
QPID_LOG(trace, "Changed V2"
<< (send_stats? " statistics":"")
<< (send_props? " properties":"")
<< " map=" << map_);
}
if (send_props) pcount++;
if (send_stats) scount++;
object->setForcePublish(false);
if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
(qmf2Support && (v2Objs >= maxReplyObjs)))
break; // have enough objects, send an indication...
}
}
if (pcount || scount) {
if (qmf1Support) {
if (msgBuffer.getPosition() > 0) {
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
size_t contentSize = msgBuffer.getPosition();
sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
<< " len=" << contentSize);
}
}
if (qmf2Support) {
string content;
ListCodec::encode(list_, content);
if (content.length()) {
stringstream key;
Variant::Map headers;
key << "agent.ind.data." << keyifyNameStr(packageName)
<< "." << keyifyNameStr(className)
<< "." << vendorNameKey
<< "." << productNameKey;
if (!instanceNameKey.empty())
key << "." << instanceNameKey;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
<< " len=" << content.length());
}
}
}
} // end processing updates for all objects
if (objectsDeleted) {
sys::Mutex::ScopedLock lock (userLock);
deleteOrphanedAgentsLH();
}
// heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
if (qmf1Support) {
char msgChars[qmfV1BufferSize];
Buffer msgBuffer(msgChars, qmfV1BufferSize);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(sys::Duration::FromEpoch());
routingKey = "console.heartbeat.1.0";
sendBuffer(msgBuffer, mExchange, routingKey);
QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
}
if (qmf2Support) {
std::stringstream addr_key;
addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey;
if (!instanceNameKey.empty())
addr_key << "." << instanceNameKey;
Variant::Map map;
Variant::Map headers;
headers["method"] = "indication";
headers["qmf.opcode"] = "_agent_heartbeat_indication";
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch());
map["_values"].asMap()["_heartbeat_interval"] = interval;
map["_values"].asMap()["_epoch"] = bootSequence;
string content;
MapCodec::encode(map, content);
// Set TTL (in msecs) on outgoing heartbeat indications based on the interval
// time to prevent stale heartbeats from getting to the consoles.
sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
}
}