Session::ResolvedNode Session::resolve()

in src/qpid/broker/amqp/Session.cpp [247:378]


Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
{
    bool isQueueRequested = is_capability_requested(QUEUE, pn_terminus_capabilities(terminus));
    bool isTopicRequested = is_capability_requested(TOPIC, pn_terminus_capabilities(terminus));
    if (isTopicRequested && isQueueRequested) {
        //requesting both renders each request meaningless
        isQueueRequested = false;
        isTopicRequested = false;
    }
    //check whether user is even allowed access to queues/topics before resolving
    authorise.access(name, isQueueRequested, isTopicRequested);
    ResolvedNode node(pn_terminus_is_dynamic(terminus));
    if (isTopicRequested || !isQueueRequested) {
        node.topic = connection.getTopics().get(name);
        if (node.topic) node.exchange = node.topic->getExchange();
        else node.exchange = connection.getBroker().getExchanges().find(name);
    }
    if (isQueueRequested || !isTopicRequested) {
        node.queue = connection.getBroker().getQueues().find(name);
    }
    bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus));
    //Strictly speaking, properties should only be specified when the
    //terminus is dynamic. However we will not enforce that here. If
    //properties are set on the attach request, we will set them on
    //our reply. This allows the 'create' and 'assert' options in the
    //qpid messaging API to be implemented over 1.0.
    node.properties.read(pn_terminus_properties(terminus));

    if (node.exchange && createOnDemand && isTopicRequested) {
        if (!node.properties.getSpecifiedExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
            //emulate 0-10 exchange-declare behaviour
            throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
        }
    }
    bool isCreateRequested = pn_terminus_is_dynamic(terminus)  || createOnDemand;
    bool isCreateQueueRequested = isCreateRequested && isQueueRequested;
    bool isCreateTopicRequested = isCreateRequested && isTopicRequested;
    if ((!node.queue && !node.exchange) || (!node.queue && isCreateQueueRequested) || (!node.exchange && isCreateTopicRequested)) {
        if (isCreateRequested) {
            //is it a queue or an exchange?
            if (isTopicRequested) {
                if (node.queue) {
                    QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of exchange named " << name << " requested when queue of the same name already exists");
                }
                qpid::framing::FieldTable args;
                qpid::amqp_0_10::translate(node.properties.getProperties(), args);
                std::pair<boost::shared_ptr<Exchange>, bool> result
                    = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
                                                            node.properties.getAlternateExchange(),
                                                            args, connection.getUserId(), connection.getId());
                node.exchange = result.first;
                node.created = result.second;
            } else {
                if (node.exchange) {
                    QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
                }
                std::pair<boost::shared_ptr<Queue>, bool> result
                    = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
                node.queue = result.first;
                node.created = result.second;
            }
        } else {
            boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
            if (nodePolicy) {
                std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection);
                node.queue = result.first;
                node.topic = result.second;
                node.created = node.queue || node.topic;
                if (node.topic) node.exchange = node.topic->getExchange();

                if (node.queue) {
                    QPID_LOG(info, "Created queue " << name << " from policy with pattern " << nodePolicy->getPattern());
                } else if (node.topic) {
                    QPID_LOG(info, "Created topic " << name << " from policy with pattern " << nodePolicy->getPattern());
                } else {
                    QPID_LOG(debug, "Created neither a topic nor a queue for " << name << " from policy with pattern " << nodePolicy->getPattern());
                }

            } else {
                size_t i = name.find('@');
                if (i != std::string::npos && (i+1) < name.length()) {
                    std::string domain = name.substr(i+1);
                    std::string local = name.substr(0, i);
                    std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
                    //does this domain exist?
                    boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
                    if (d) {
                        node.relay = boost::shared_ptr<Relay>(new Relay(1000));
                        if (incoming) {
                            d->connect(false, id, name, local, connection, node.relay);
                        } else {
                            d->connect(true, id, local, name, connection, node.relay);
                        }
                    }
                }
            }
        }
    } else if (node.queue && node.topic) {
        if (isTopicRequested) {
            QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, topic requested");
            node.queue.reset();
        } else if (isQueueRequested) {
            QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, queue requested");
            node.exchange.reset();
            node.topic.reset();
        } else {
            QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic");
            node.queue.reset();
        }
    } else if (node.queue && node.exchange) {
        if (isTopicRequested) {
            QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, topic requested");
            node.queue.reset();
        } else if (isQueueRequested) {
            QPID_LOG_CAT(info, protocol, "Ambiguous node name; " << name << " could be queue or topic, queue requested");
            node.exchange.reset();
            node.topic.reset();
        } else {
            QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
            node.exchange.reset();
        }
    }

    if (node.properties.isExclusive() && node.queue) {
        if (node.queue->setExclusiveOwner(this)) {
            exclusiveQueues.insert(node.queue);
        } else {
            throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot grant exclusive access to ") + node.queue->getName());
        }
    }
    return node;
}