void ManagementAgent::dispatchAgentCommand()

in src/qpid/management/ManagementAgent.cpp [2299:2370]


void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
{
    string   rte;
    string   rtk;

    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
    const framing::MessageProperties* p = transfer ?
        transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
    if (p && p->hasReplyTo()) {
        const framing::ReplyTo& rt = p->getReplyTo();
        rte = rt.getExchange();
        rtk = rt.getRoutingKey();
    }
    else
        return;

    ResizableBuffer inBuffer(qmfV1BufferSize);
    uint8_t  opcode;

    if (transfer->getContentSize() > qmfV1BufferSize) {
        QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
                 transfer->getContentSize());
        return;
    }

    inBuffer.putRawData(transfer->getContent());
    uint32_t bufferLen = inBuffer.getPosition();
    inBuffer.reset();

    ScopedManagementContext context(msg.getPublisherIdentity());
    const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
    if (headers && p->getAppId() == "qmf2")
    {
        string opcode = headers->getAsString("qmf.opcode");
        string contentType = headers->getAsString("qmf.content");
        string body;
        string cid;
        inBuffer.getRawData(body, bufferLen);
        {
            if (p && p->hasCorrelationId()) {
                cid = p->getCorrelationId();
            }

            if (opcode == "_method_request")
                return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
            else if (opcode == "_query_request")
                return handleGetQuery(body, rte, rtk, cid, context.getUserId(), viaLocal);
            else if (opcode == "_agent_locate_request")
                return handleLocateRequest(body, rte, rtk, cid);
        }
        QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
        return;
    }

    // old preV2 binary messages
    while (inBuffer.getPosition() < bufferLen) {
        uint32_t sequence;
        if (!checkHeader(inBuffer, &opcode, &sequence))
            return;

        if      (opcode == 'B') handleBrokerRequest  (inBuffer, rtk, sequence);
        else if (opcode == 'P') handlePackageQuery   (inBuffer, rtk, sequence);
        else if (opcode == 'p') handlePackageInd     (inBuffer, rtk, sequence);
        else if (opcode == 'Q') handleClassQuery     (inBuffer, rtk, sequence);
        else if (opcode == 'q') handleClassInd       (inBuffer, rtk, sequence);
        else if (opcode == 'S') handleSchemaRequest  (inBuffer, rte, rtk, sequence);
        else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
        else if (opcode == 'A') handleAttachRequest  (inBuffer, rtk, sequence, msg.__getPublisherMgmtObject());
        else if (opcode == 'G') handleGetQuery       (inBuffer, rtk, sequence, context.getMgmtId());
        else if (opcode == 'M') handleMethodRequest  (inBuffer, rtk, sequence, context.getMgmtId());
    }
}