in src/qpid/broker/amqp/Session.cpp [247:378]
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
{
bool isQueueRequested = is_capability_requested(QUEUE, pn_terminus_capabilities(terminus));
bool isTopicRequested = is_capability_requested(TOPIC, pn_terminus_capabilities(terminus));
if (isTopicRequested && isQueueRequested) {
//requesting both renders each request meaningless
isQueueRequested = false;
isTopicRequested = false;
}
//check whether user is even allowed access to queues/topics before resolving
authorise.access(name, isQueueRequested, isTopicRequested);
ResolvedNode node(pn_terminus_is_dynamic(terminus));
if (isTopicRequested || !isQueueRequested) {
node.topic = connection.getTopics().get(name);
if (node.topic) node.exchange = node.topic->getExchange();
else node.exchange = connection.getBroker().getExchanges().find(name);
}
if (isQueueRequested || !isTopicRequested) {
node.queue = connection.getBroker().getQueues().find(name);
}
bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus));
//Strictly speaking, properties should only be specified when the
//terminus is dynamic. However we will not enforce that here. If
//properties are set on the attach request, we will set them on
//our reply. This allows the 'create' and 'assert' options in the
//qpid messaging API to be implemented over 1.0.
node.properties.read(pn_terminus_properties(terminus));
if (node.exchange && createOnDemand && isTopicRequested) {
if (!node.properties.getSpecifiedExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
//emulate 0-10 exchange-declare behaviour
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
}
}
bool isCreateRequested = pn_terminus_is_dynamic(terminus) || createOnDemand;
bool isCreateQueueRequested = isCreateRequested && isQueueRequested;
bool isCreateTopicRequested = isCreateRequested && isTopicRequested;
if ((!node.queue && !node.exchange) || (!node.queue && isCreateQueueRequested) || (!node.exchange && isCreateTopicRequested)) {
if (isCreateRequested) {
//is it a queue or an exchange?
if (isTopicRequested) {
if (node.queue) {
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of exchange named " << name << " requested when queue of the same name already exists");
}
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(node.properties.getProperties(), args);
std::pair<boost::shared_ptr<Exchange>, bool> result
= connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId());
node.exchange = result.first;
node.created = result.second;
} else {
if (node.exchange) {
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
}
std::pair<boost::shared_ptr<Queue>, bool> result
= connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
node.queue = result.first;
node.created = result.second;
}
} else {
boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
if (nodePolicy) {
std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection);
node.queue = result.first;
node.topic = result.second;
node.created = node.queue || node.topic;
if (node.topic) node.exchange = node.topic->getExchange();
if (node.queue) {
QPID_LOG(info, "Created queue " << name << " from policy with pattern " << nodePolicy->getPattern());
} else if (node.topic) {
QPID_LOG(info, "Created topic " << name << " from policy with pattern " << nodePolicy->getPattern());
} else {
QPID_LOG(debug, "Created neither a topic nor a queue for " << name << " from policy with pattern " << nodePolicy->getPattern());
}
} else {
size_t i = name.find('@');
if (i != std::string::npos && (i+1) < name.length()) {
std::string domain = name.substr(i+1);
std::string local = name.substr(0, i);
std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
//does this domain exist?
boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
if (d) {
node.relay = boost::shared_ptr<Relay>(new Relay(1000));
if (incoming) {
d->connect(false, id, name, local, connection, node.relay);
} else {
d->connect(true, id, local, name, connection, node.relay);
}
}
}
}
}
} else if (node.queue && node.topic) {
if (isTopicRequested) {
QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, topic requested");
node.queue.reset();
} else if (isQueueRequested) {
QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, queue requested");
node.exchange.reset();
node.topic.reset();
} else {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic");
node.queue.reset();
}
} else if (node.queue && node.exchange) {
if (isTopicRequested) {
QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, topic requested");
node.queue.reset();
} else if (isQueueRequested) {
QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, queue requested");
node.exchange.reset();
node.topic.reset();
} else {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
node.exchange.reset();
}
}
if (node.properties.isExclusive() && node.queue) {
if (node.queue->setExclusiveOwner(this)) {
exclusiveQueues.insert(node.queue);
} else {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot grant exclusive access to ") + node.queue->getName());
}
}
return node;
}