in src/qpid/broker/amqp/Session.cpp [507:592]
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
ResolvedNode node = resolve(name, source, false);
if (node.queue) {
setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue);
} else if (node.exchange) {
setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange);
}
Filter filter;
filter.read(pn_terminus_filter(source));
const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
if (!targetAddress) {
targetAddress = pn_terminus_get_address(pn_link_target(link));
}
std::string target;
if (targetAddress) {
target = targetAddress;
}
if (node.queue) {
authorise.outgoing(node.queue);
SubscriptionType type = (pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY) || (node.queue->isBrowseOnly()) ? BROWSER : CONSUMER;
if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
}
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
pn_terminus_set_distribution_mode(pn_link_source(link), type == BROWSER ? PN_DIST_MODE_COPY : PN_DIST_MODE_MOVE);
} else if (node.exchange) {
authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
bool autodelete = pn_terminus_get_expiry_policy(source) != PN_EXPIRE_NEVER;
QueueSettings settings(durable, autodelete);
std::string altExchange;
if (node.topic) {
settings = node.topic->getPolicy();
settings.durable = durable;
//only determine autodelete from link details if the policy did not imply autodeletion
if (!settings.autodelete) settings.autodelete = autodelete;
altExchange = node.topic->getAlternateExchange();
}
if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) {
//only use delay from link if policy didn't specify one
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
if (settings.autoDeleteDelay)
settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
}
if (settings.autoDeleteDelay) {
settings.autodelete = true;
}
filter.configure(settings);
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
//overridden when access to link properties is provided
//(PROTON-335))
queueName << pn_link_name(link);
} else {
//combination of container id and link name is unique
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
= connection.getBroker().createQueue(queueName.str(), settings, this, altExchange, connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
q->init();
outgoing[link] = q;
} else if (node.relay) {
boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
out->init();
} else {
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/
}
filter.write(pn_terminus_filter(pn_link_source(link)));
QPID_LOG(debug, "Outgoing link attached");
}