in src/qpid/broker/Broker.cpp [1187:1286]
Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
const std::string& tgtQueue,
const ConnectionIdentity* context)
{
Queue::shared_ptr srcQ(queues.find(srcQueue));
if (!srcQ) {
QPID_LOG(error, "Queue redirect failed: source queue not found: "
<< srcQueue);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
if (!tgtQueue.empty()) {
// NonBlank target queue creates partnership
Queue::shared_ptr tgtQ(queues.find(tgtQueue));
if (!tgtQ) {
QPID_LOG(error, "Queue redirect failed: target queue not found: "
<< tgtQueue);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
if (srcQueue.compare(tgtQueue) == 0) {
QPID_LOG(error, "Queue redirect source queue: "
<< tgtQueue << " cannot be its own target");
return Manageable::STATUS_USER;
}
if (srcQ->isAutoDelete()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is autodelete and can not be part of redirect");
return Manageable::STATUS_USER;
}
if (tgtQ->isAutoDelete()) {
QPID_LOG(error, "Queue redirect target queue: "
<< tgtQueue << " is autodelete and can not be part of redirect");
return Manageable::STATUS_USER;
}
if (srcQ->getRedirectPeer()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is already redirected");
return Manageable::STATUS_USER;
}
if (tgtQ->getRedirectPeer()) {
QPID_LOG(error, "Queue redirect target queue: "
<< tgtQueue << " is already redirected");
return Manageable::STATUS_USER;
}
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), ¶ms))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(unknown)")));
}
// Start the backup overflow partnership
srcQ->setRedirectPeer(tgtQ, true);
tgtQ->setRedirectPeer(srcQ, false);
// Set management state
srcQ->setMgmtRedirectState(tgtQueue, true, true);
tgtQ->setMgmtRedirectState(srcQueue, true, false);
// Management event
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue));
}
QPID_LOG(info, "Queue redirect complete. queue: "
<< srcQueue << " target queue: " << tgtQueue);
return Manageable::STATUS_OK;
} else {
// Blank target queue destroys partnership
Queue::shared_ptr tgtQ(srcQ->getRedirectPeer());
if (!tgtQ) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is not in redirected");
return Manageable::STATUS_USER;
}
if (!srcQ->isRedirectSource()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is not a redirect source");
return Manageable::STATUS_USER;
}
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), ¶ms))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(unknown)")));
}
queueRedirectDestroy(srcQ, tgtQ, true);
return Manageable::STATUS_OK;
}
}