Manageable::status_t Broker::queueRedirect()

in src/qpid/broker/Broker.cpp [1187:1286]


Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
                                           const std::string& tgtQueue,
                                           const ConnectionIdentity* context)
{
    Queue::shared_ptr srcQ(queues.find(srcQueue));
    if (!srcQ) {
        QPID_LOG(error, "Queue redirect failed: source queue not found: "
            << srcQueue);
        return Manageable::STATUS_UNKNOWN_OBJECT;
    }

    if (!tgtQueue.empty()) {
        // NonBlank target queue creates partnership
        Queue::shared_ptr tgtQ(queues.find(tgtQueue));
        if (!tgtQ) {
            QPID_LOG(error, "Queue redirect failed: target queue not found: "
                << tgtQueue);
            return Manageable::STATUS_UNKNOWN_OBJECT;
        }

        if (srcQueue.compare(tgtQueue) == 0) {
            QPID_LOG(error, "Queue redirect source queue: "
                << tgtQueue << " cannot be its own target");
            return Manageable::STATUS_USER;
        }

        if (srcQ->isAutoDelete()) {
            QPID_LOG(error, "Queue redirect source queue: "
                << srcQueue << " is autodelete and can not be part of redirect");
            return Manageable::STATUS_USER;
        }

        if (tgtQ->isAutoDelete()) {
            QPID_LOG(error, "Queue redirect target queue: "
                << tgtQueue << " is autodelete and can not be part of redirect");
            return Manageable::STATUS_USER;
        }

        if (srcQ->getRedirectPeer()) {
            QPID_LOG(error, "Queue redirect source queue: "
                << srcQueue << " is already redirected");
            return Manageable::STATUS_USER;
        }

        if (tgtQ->getRedirectPeer()) {
            QPID_LOG(error, "Queue redirect target queue: "
                << tgtQueue << " is already redirected");
            return Manageable::STATUS_USER;
        }

        if (acl) {
            std::map<acl::Property, std::string> params;
            params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
            if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(unknown)")));
        }

        // Start the backup overflow partnership
        srcQ->setRedirectPeer(tgtQ, true);
        tgtQ->setRedirectPeer(srcQ, false);

        // Set management state
        srcQ->setMgmtRedirectState(tgtQueue, true, true);
        tgtQ->setMgmtRedirectState(srcQueue, true, false);

        // Management event
        if (managementAgent.get()) {
            managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue));
        }

        QPID_LOG(info, "Queue redirect complete. queue: "
            << srcQueue << " target queue: " << tgtQueue);
        return Manageable::STATUS_OK;
    } else {
        // Blank target queue destroys partnership
        Queue::shared_ptr tgtQ(srcQ->getRedirectPeer());
        if (!tgtQ) {
            QPID_LOG(error, "Queue redirect source queue: "
                << srcQueue << " is not in redirected");
            return Manageable::STATUS_USER;
        }

        if (!srcQ->isRedirectSource()) {
            QPID_LOG(error, "Queue redirect source queue: "
                << srcQueue << " is not a redirect source");
            return Manageable::STATUS_USER;
        }

        if (acl) {
            std::map<acl::Property, std::string> params;
            params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
            if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(unknown)")));
        }

        queueRedirectDestroy(srcQ, tgtQ, true);

        return Manageable::STATUS_OK;
    }
}