in src/qpid/broker/Bridge.cpp [103:201]
void Bridge::create(amqp_0_10::Connection& c)
{
detached = false; // Reset detached in case we are recovering.
conn = &c;
SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
if (initialize) {
initialize(*this, sessionHandler); // custom subscription initializer supplied
} else {
// will a temp queue be created for this bridge?
const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
// UI convention: user specifies 0 for infinite credit
const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
// use explicit acks only for non-temp queues, useless for temp queues since they are
// destroyed when the session drops (can't resend unacked msgs)
const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;
// configure command.sync frequency
FieldTable options;
uint32_t freq = 0;
if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs
freq = uint32_t(args.i_sync);
} else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
// force occasional sync to keep from stalling due to lack of credit
freq = (credit + 1)/2;
}
if (freq)
options.setInt("qpid.sync_frequency", freq);
// create a subscription on the remote
if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, credit); // message credit
peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit
QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
if (!useExistingQueue) {
FieldTable queueSettings;
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
const string& peerTag = c.getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.id", peerTag);
}
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
const string& localTag = link->getBroker()->getFederationTag();
if (localTag.size())
queueSettings.setString("qpid.trace.exclude", localTag);
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
bool exclusive = true; // only exclusive if the queue is owned by the bridge
bool autoDelete = exclusive && !durable;//auto delete transient queues?
peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
}
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, credit);
peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
if (args.i_dynamic) {
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}