bool HeadersExchange::bind()

in src/qpid/broker/HeadersExchange.cpp [175:265]


bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
{
    string fedOp(fedOpBind);
    string fedTags;
    string fedOrigin;
    if (args) {
        fedOp = args->getAsString(qpidFedOp);
        fedTags = args->getAsString(qpidFedTags);
        fedOrigin = args->getAsString(qpidFedOrigin);
    }

    bool propagate = false;

    // The federation args get propagated directly, so we need to identify
    // the non federation args in case a federated propagate is needed
    FieldTable extra_args;
    getNonFedArgs(args, extra_args);
    
    if (fedOp.empty() || fedOp == fedOpBind) {
        // x-match arg MUST be present for a bind call
        std::string x_match_value = getMatch(args);

        if (x_match_value != all && x_match_value != any) {
            throw InternalErrorException(QPID_MSG("Invalid or missing x-match value binding to headers exchange. Must be a string [\"all\" or \"any\"]"));
        }

        Bindings::ConstPtr p = bindings.snapshot();
        if (p.get()) {
            MatchArgs matchArgs(queue, &extra_args);
            MatchKey matchKey(queue, bindingKey);
            for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
                if (matchKey(*i) && !matchArgs(*i)) {
                    throw InternalErrorException(QPID_MSG("Exchange: " << getName()
                        << ", binding key: " << bindingKey
                        << " Duplicate binding key not allowed." ));
                }
            }
        }

        {
            Mutex::ScopedLock l(lock);
            //NOTE: do not include the fed op/tags/origin in the
            //arguments as when x-match is 'all' these would prevent
            //matching (they are internally added properties
            //controlling binding propagation but not relevant to
            //actual routing)
            Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable()));
            BoundKey bk(binding, extra_args);
            if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
                binding->startManagement();
                propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
                if (mgmtExchange != 0) {
                    mgmtExchange->inc_bindingCount();
                }
            } else {
                bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
                return false;
            }
        } // lock dropped

    } else if (fedOp == fedOpUnbind) {
        Mutex::ScopedLock l(lock);
 
        FedUnbindModifier modifier(queue->getName(), fedOrigin);
        bindings.modify_if(MatchKey(queue, bindingKey), modifier);
        propagate = modifier.shouldPropagate;
        if (modifier.shouldUnbind) {
          unbind(queue, bindingKey, args);
        }
        
    } else if (fedOp == fedOpReorigin) {
        Bindings::ConstPtr p = bindings.snapshot();
        if (p.get())
        {
            Mutex::ScopedLock l(lock);
            for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i)
            {
                if ((*i).fedBinding.hasLocal()) {
                    propagateFedOp( (*i).binding->key, string(), fedOpBind, string());
                }
            }
        }
    }
    routeIVE();
    if (propagate) {
        FieldTable * prop_args = (extra_args.count() != 0 ? &extra_args : 0);
        propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, prop_args);
    }

    return true;
}