void Bridge::create()

in src/qpid/broker/Bridge.cpp [103:201]


void Bridge::create(amqp_0_10::Connection& c)
{
    detached = false;           // Reset detached in case we are recovering.
    conn = &c;

    SessionHandler& sessionHandler = c.getChannel(channel);
    sessionHandler.setErrorListener(shared_from_this());
    if (args.i_srcIsLocal) {
        if (args.i_dynamic)
            throw Exception("Dynamic routing not supported for push routes");
        // Point the bridging commands at the local connection handler
        pushHandler.reset(new PushHandler(&c));
        channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));

        session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
        peer.reset(new framing::AMQP_ServerProxy(*channelHandler));

        session->attach(sessionName, false);
        session->commandPoint(0,0);
    } else {
        sessionHandler.attachAs(sessionName);
        // Point the bridging commands at the remote peer broker
        peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
    }

    if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();

    if (initialize) {
        initialize(*this, sessionHandler);  // custom subscription initializer supplied
    } else {
        // will a temp queue be created for this bridge?
        const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
        // UI convention: user specifies 0 for infinite credit
        const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
        // use explicit acks only for non-temp queues, useless for temp queues since they are
        // destroyed when the session drops (can't resend unacked msgs)
        const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;

        // configure command.sync frequency
        FieldTable options;
        uint32_t freq = 0;
        if (ack_mode == EXPLICIT_ACK) {  // user explicitly configured syncs
            freq = uint32_t(args.i_sync);
        } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
            // force occasional sync to keep from stalling due to lack of credit
            freq = (credit + 1)/2;
        }
        if (freq)
            options.setInt("qpid.sync_frequency", freq);

        // create a subscription on the remote
        if (args.i_srcIsQueue) {
            peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
            peer->getMessage().flow(args.i_dest, 0, credit);  // message credit
            peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);     // byte credit
            QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
        } else {
            if (!useExistingQueue) {
                FieldTable queueSettings;

                if (args.i_tag.size()) {
                    queueSettings.setString("qpid.trace.id", args.i_tag);
                } else {
                    const string& peerTag = c.getFederationPeerTag();
                    if (peerTag.size())
                        queueSettings.setString("qpid.trace.id", peerTag);
                }

                if (args.i_excludes.size()) {
                    queueSettings.setString("qpid.trace.exclude", args.i_excludes);
                } else {
                    const string& localTag = link->getBroker()->getFederationTag();
                    if (localTag.size())
                        queueSettings.setString("qpid.trace.exclude", localTag);
                }

                bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
                bool exclusive = true;  // only exclusive if the queue is owned by the bridge
                bool autoDelete = exclusive && !durable;//auto delete transient queues?
                peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
            }
            if (!args.i_dynamic)
                peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
            peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
            peer->getMessage().flow(args.i_dest, 0, credit);
            peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
            if (args.i_dynamic) {
                Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
                if (exchange.get() == 0)
                    throw Exception("Exchange not found for dynamic route");
                exchange->registerDynamicBridge(this);
                QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
            } else {
                QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
            }
        }
    }
    if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}