in src/qpid/management/ManagementAgent.cpp [2299:2370]
void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
{
string rte;
string rtk;
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p = transfer ?
transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
rtk = rt.getRoutingKey();
}
else
return;
ResizableBuffer inBuffer(qmfV1BufferSize);
uint8_t opcode;
if (transfer->getContentSize() > qmfV1BufferSize) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
transfer->getContentSize());
return;
}
inBuffer.putRawData(transfer->getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
ScopedManagementContext context(msg.getPublisherIdentity());
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
if (headers && p->getAppId() == "qmf2")
{
string opcode = headers->getAsString("qmf.opcode");
string contentType = headers->getAsString("qmf.content");
string body;
string cid;
inBuffer.getRawData(body, bufferLen);
{
if (p && p->hasCorrelationId()) {
cid = p->getCorrelationId();
}
if (opcode == "_method_request")
return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_query_request")
return handleGetQuery(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_agent_locate_request")
return handleLocateRequest(body, rte, rtk, cid);
}
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
}
// old preV2 binary messages
while (inBuffer.getPosition() < bufferLen) {
uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence);
else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence);
else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence);
else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence);
else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.__getPublisherMgmtObject());
else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence, context.getMgmtId());
else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId());
}
}