in src/qpid/broker/Broker.cpp [836:1027]
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
connectionId = context->getMgmtId();
}
//TODO: implement 'strict' option (check there are no unrecognised properties)
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) {
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory");
} else if (type == TYPE_QUEUE) {
bool durable(false);
bool autodelete(false);
std::string alternateExchange;
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == DURABLE) durable = i->second;
else if (i->first == AUTO_DELETE) autodelete = i->second;
else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
QueueSettings settings(durable, autodelete);
Variant::Map unused;
settings.populate(extensions, unused);
qpid::amqp_0_10::translate(unused, settings.storeSettings);
//TODO: unused doesn't take store settings into account... so can't yet implement strict
QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused);
std::pair<boost::shared_ptr<Queue>, bool> result =
createQueue(name, settings, 0, alternateExchange, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
} else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
bool durable(false);
bool autodelete(false);
std::string exchangeType("topic");
std::string alternateExchange;
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == DURABLE) durable = i->second;
else if (i->first == AUTO_DELETE) autodelete = i->second;
else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
} catch (const UnknownExchangeTypeException&) {
throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
}
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
std::string exchangeType("topic");
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, 0, userId, connectionId);
} else if (type == TYPE_LINK) {
QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
throw ReservedObjectName(name);
}
std::string host;
uint16_t port = 0;
std::string transport = TCP_TRANSPORT;
bool durable = false;
std::string authMech, username, password;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
if (i->first == HOST) host = i->second.asString();
else if (i->first == PORT) port = i->second.asUint16();
else if (i->first == TRANSPORT) transport = i->second.asString();
else if (i->first == DURABLE) durable = bool(i->second);
else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
else if (i->first == USERNAME) username = i->second.asString();
else if (i->first == PASSWORD) password = i->second.asString();
else {
// TODO: strict checking here
}
}
if (!getTransportInfo(transport).connectorFactory) {
QPID_LOG(error, "Transport '" << transport << "' not supported.");
throw UnsupportedTransport(transport);
}
std::pair<boost::shared_ptr<Link>, bool> rc;
rc = links.declare(name, host, port, transport, durable, authMech, username, password);
if (!rc.first) {
QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
"; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
throw InvalidParameter(name);
}
if (!rc.second) {
QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
throw ObjectAlreadyExists(name);
}
} else if (type == TYPE_BRIDGE) {
QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
throw ReservedObjectName(name);
}
std::string linkName;
std::string src;
std::string dest;
std::string key;
std::string id;
std::string excludes;
std::string queueName;
bool durable = false;
bool srcIsQueue = false;
bool srcIsLocal = false;
bool dynamic = false;
uint16_t sync = 0;
uint32_t credit = LinkRegistry::INFINITE_CREDIT;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
if (i->first == LINK) linkName = i->second.asString();
else if (i->first == SRC) src = i->second.asString();
else if (i->first == DEST) dest = i->second.asString();
else if (i->first == KEY) key = i->second.asString();
else if (i->first == TAG) id = i->second.asString();
else if (i->first == EXCLUDES) excludes = i->second.asString();
else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
else if (i->first == DYNAMIC) dynamic = bool(i->second);
else if (i->first == SYNC) sync = i->second.asUint16();
else if (i->first == CREDIT) credit = i->second.asUint32();
else if (i->first == DURABLE) durable = bool(i->second);
else if (i->first == QUEUE_NAME) queueName = i->second.asString();
else {
// TODO: strict checking here
}
}
boost::shared_ptr<Link> link;
if (linkName.empty() || !(link = links.getLink(linkName))) {
QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
throw InvalidParameter(name);
}
std::pair<Bridge::shared_ptr, bool> rc =
links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
dynamic, sync, credit,
0,
queueName);
if (!rc.first) {
QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
"; src=" << src << "; dest=" << dest << "; key=" << key);
throw InvalidParameter(name);
}
if (!rc.second) {
QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
throw ObjectAlreadyExists(name);
}
} else {
throw UnknownObjectType(type);
}
}