void Session::setupOutgoing()

in src/qpid/broker/amqp/Session.cpp [507:592]


void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
    ResolvedNode node = resolve(name, source, false);
    if (node.queue) {
        setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
        node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue);
    } else if (node.exchange) {
        setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
        node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange);
    }

    Filter filter;
    filter.read(pn_terminus_filter(source));
    const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
    if (!targetAddress) {
        targetAddress = pn_terminus_get_address(pn_link_target(link));
    }
    std::string target;
    if (targetAddress) {
        target = targetAddress;
    }

    if (node.queue) {
        authorise.outgoing(node.queue);
        SubscriptionType type = (pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY) || (node.queue->isBrowseOnly()) ? BROWSER : CONSUMER;
        if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
            throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
        }
        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.trackControllingLink()));
        q->init();
        filter.apply(q);
        outgoing[link] = q;
        pn_terminus_set_distribution_mode(pn_link_source(link), type == BROWSER ? PN_DIST_MODE_COPY : PN_DIST_MODE_MOVE);
    } else if (node.exchange) {
        authorise.access(node.exchange);//do separate access check before trying to create the queue
        bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
        bool durable = pn_terminus_get_durability(source);
        bool autodelete = pn_terminus_get_expiry_policy(source) != PN_EXPIRE_NEVER;
        QueueSettings settings(durable, autodelete);
        std::string altExchange;
        if (node.topic) {
            settings = node.topic->getPolicy();
            settings.durable = durable;
            //only determine autodelete from link details if the policy did not imply autodeletion
            if (!settings.autodelete) settings.autodelete = autodelete;
            altExchange = node.topic->getAlternateExchange();
        }
        if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) {
            //only use delay from link if policy didn't specify one
            settings.autoDeleteDelay = pn_terminus_get_timeout(source);
            if (settings.autoDeleteDelay)
                settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
        }
        if (settings.autoDeleteDelay) {
            settings.autodelete = true;
        }
        filter.configure(settings);
        std::stringstream queueName;
        if (shared) {
            //just use link name (TODO: could allow this to be
            //overridden when access to link properties is provided
            //(PROTON-335))
            queueName << pn_link_name(link);
        } else {
            //combination of container id and link name is unique
            queueName << connection.getContainerId() << "_" << pn_link_name(link);
        }
        boost::shared_ptr<qpid::broker::Queue> queue
            = connection.getBroker().createQueue(queueName.str(), settings, this, altExchange, connection.getUserId(), connection.getId()).first;
        if (!shared) queue->setExclusiveOwner(this);
        authorise.outgoing(node.exchange, queue, filter);
        filter.bind(node.exchange, queue);
        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
        q->init();
        outgoing[link] = q;
    } else if (node.relay) {
        boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
        outgoing[link] = out;
        out->init();
    } else {
        pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
        throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/
    }
    filter.write(pn_terminus_filter(pn_link_source(link)));
    QPID_LOG(debug, "Outgoing link attached");
}