in src/qpid/messaging/amqp/AddressHelper.cpp [408:527]
void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
{
if (assertEnabled(mode)) {
QPID_LOG(debug, "checking capabilities: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
}
pn_data_t* data = pn_terminus_capabilities(terminus);
if (pn_data_next(data)) {
pn_type_t type = pn_data_type(data);
if (type == PN_ARRAY) {
pn_data_enter(data);
while (pn_data_next(data)) {
desired.erase(convert(pn_data_get_symbol(data)));
}
pn_data_exit(data);
} else if (type == PN_SYMBOL) {
desired.erase(convert(pn_data_get_symbol(data)));
} else {
QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
}
if (desired.size()) {
std::stringstream missing;
missing << "Desired capabilities not met: ";
bool first(true);
for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) {
if (first) first = false;
else missing << ", ";
missing << *i;
}
throw qpid::messaging::AssertionFailed(missing.str());
}
//ensure all desired filters are in use
data = pn_terminus_filter(terminus);
if (pn_data_next(data)) {
size_t count = pn_data_get_map(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
//skip key:
if (!pn_data_next(data)) break;
//expecting described value:
if (pn_data_is_described(data)) {
pn_data_enter(data);
pn_data_next(data);
if (pn_data_type(data) == PN_ULONG) {
confirmFilter(pn_data_get_ulong(data));
} else if (pn_data_type(data) == PN_SYMBOL) {
confirmFilter(convert(pn_data_get_symbol(data)));
}
pn_data_exit(data);
}
}
pn_data_exit(data);
}
std::stringstream missing;
missing << "Desired filters not in use: ";
bool first(true);
for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) {
if (!i->confirmed) {
if (first) first = false;
else missing << ", ";
missing << i->name << "(";
if (i->descriptorSymbol.empty()) missing << "0x" << std::hex << i->descriptorCode;
else missing << i->descriptorSymbol;
missing << ")";
}
}
if (!first) throw qpid::messaging::AssertionFailed(missing.str());
//assert on properties (Note: this violates the AMQP 1.0
//specification - as does the create option - by sending
//node-properties even if the dynamic option is not
//set. However this can be avoided by not specifying any node
//properties when asserting)
if (!type.empty() || durableNode || !properties.empty()) {
bool isAutoDeleted = false;
qpid::types::Variant::Map requested = properties;
if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE;
if (durableNode) requested[DURABLE] = true;
data = pn_terminus_properties(terminus);
if (pn_data_next(data)) {
size_t count = pn_data_get_map(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
std::string key = convert(pn_data_get_symbol(data));
pn_data_next(data);
qpid::types::Variant::Map::const_iterator j = requested.find(key);
qpid::types::Variant v;
if (key == LIFETIME_POLICY) {
isAutoDeleted = true;
if (j != requested.end() && checkLifetimePolicy(j->second.asString(), data)) {
requested.erase(j->first);
}
} else if (key == AUTO_DELETE) {
PnData(data).get(v);
isAutoDeleted = v.asBool();
} else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) {
requested.erase(j->first);
}
}
pn_data_exit(data);
qpid::types::Variant::Map::iterator i = requested.find(AUTO_DELETE);
if (i != requested.end() && i->second.asBool() == isAutoDeleted) {
requested.erase(i);
}
if (!requested.empty()) {
std::stringstream missing;
missing << "Requested node properties not met: " << requested;
throw qpid::messaging::AssertionFailed(missing.str());
}
}
}
}
}