in src/qpid/broker/HeadersExchange.cpp [175:265]
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
{
string fedOp(fedOpBind);
string fedTags;
string fedOrigin;
if (args) {
fedOp = args->getAsString(qpidFedOp);
fedTags = args->getAsString(qpidFedTags);
fedOrigin = args->getAsString(qpidFedOrigin);
}
bool propagate = false;
// The federation args get propagated directly, so we need to identify
// the non federation args in case a federated propagate is needed
FieldTable extra_args;
getNonFedArgs(args, extra_args);
if (fedOp.empty() || fedOp == fedOpBind) {
// x-match arg MUST be present for a bind call
std::string x_match_value = getMatch(args);
if (x_match_value != all && x_match_value != any) {
throw InternalErrorException(QPID_MSG("Invalid or missing x-match value binding to headers exchange. Must be a string [\"all\" or \"any\"]"));
}
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
MatchArgs matchArgs(queue, &extra_args);
MatchKey matchKey(queue, bindingKey);
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (matchKey(*i) && !matchArgs(*i)) {
throw InternalErrorException(QPID_MSG("Exchange: " << getName()
<< ", binding key: " << bindingKey
<< " Duplicate binding key not allowed." ));
}
}
}
{
Mutex::ScopedLock l(lock);
//NOTE: do not include the fed op/tags/origin in the
//arguments as when x-match is 'all' these would prevent
//matching (they are internally added properties
//controlling binding propagation but not relevant to
//actual routing)
Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable()));
BoundKey bk(binding, extra_args);
if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
binding->startManagement();
propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} // lock dropped
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
FedUnbindModifier modifier(queue->getName(), fedOrigin);
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
unbind(queue, bindingKey, args);
}
} else if (fedOp == fedOpReorigin) {
Bindings::ConstPtr p = bindings.snapshot();
if (p.get())
{
Mutex::ScopedLock l(lock);
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i)
{
if ((*i).fedBinding.hasLocal()) {
propagateFedOp( (*i).binding->key, string(), fedOpBind, string());
}
}
}
}
routeIVE();
if (propagate) {
FieldTable * prop_args = (extra_args.count() != 0 ? &extra_args : 0);
propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, prop_args);
}
return true;
}