void Broker::createObject()

in src/qpid/broker/Broker.cpp [836:1027]


void Broker::createObject(const std::string& type, const std::string& name,
                          const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
{
    std::string userId;
    std::string connectionId;
    if (context) {
        userId = context->getUserId();
        connectionId = context->getMgmtId();
    }
    //TODO: implement 'strict' option (check there are no unrecognised properties)
    QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
    if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) {
        QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory");
    } else if (type == TYPE_QUEUE) {
        bool durable(false);
        bool autodelete(false);
        std::string alternateExchange;
        Variant::Map extensions;
        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
            // extract durable, auto-delete and alternate-exchange properties
            if (i->first == DURABLE) durable = i->second;
            else if (i->first == AUTO_DELETE) autodelete = i->second;
            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
            //treat everything else as extension properties
            else extensions[i->first] = i->second;
        }
        QueueSettings settings(durable, autodelete);
        Variant::Map unused;
        settings.populate(extensions, unused);
        qpid::amqp_0_10::translate(unused, settings.storeSettings);
        //TODO: unused doesn't take store settings into account... so can't yet implement strict
        QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused);

        std::pair<boost::shared_ptr<Queue>, bool> result =
            createQueue(name, settings, 0, alternateExchange, userId, connectionId);
        if (!result.second) {
            throw ObjectAlreadyExists(name);
        }
    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
        bool durable(false);
        bool autodelete(false);
        std::string exchangeType("topic");
        std::string alternateExchange;
        Variant::Map extensions;
        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
            // extract durable, auto-delete and alternate-exchange properties
            if (i->first == DURABLE) durable = i->second;
            else if (i->first == AUTO_DELETE) autodelete = i->second;
            else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
            //treat everything else as extension properties
            else extensions[i->first] = i->second;
        }
        framing::FieldTable arguments;
        qpid::amqp_0_10::translate(extensions, arguments);

        try {
            std::pair<boost::shared_ptr<Exchange>, bool> result =
                createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId);
            if (!result.second) {
                throw ObjectAlreadyExists(name);
            }
        } catch (const UnknownExchangeTypeException&) {
            throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
        }
    } else if (type == TYPE_BINDING) {
        BindingIdentifier binding(name);
        std::string exchangeType("topic");
        Variant::Map extensions;
        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
            // extract durable, auto-delete and alternate-exchange properties
            if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
            //treat everything else as extension properties
            else extensions[i->first] = i->second;
        }
        framing::FieldTable arguments;
        qpid::amqp_0_10::translate(extensions, arguments);

        bind(binding.queue, binding.exchange, binding.key, arguments, 0, userId, connectionId);

    } else if (type == TYPE_LINK) {

        QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );

        if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
            QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
            throw ReservedObjectName(name);
        }

        std::string host;
        uint16_t port = 0;
        std::string transport = TCP_TRANSPORT;
        bool durable = false;
        std::string authMech, username, password;

        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
            if (i->first == HOST) host = i->second.asString();
            else if (i->first == PORT) port = i->second.asUint16();
            else if (i->first == TRANSPORT) transport = i->second.asString();
            else if (i->first == DURABLE) durable = bool(i->second);
            else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
            else if (i->first == USERNAME) username = i->second.asString();
            else if (i->first == PASSWORD) password = i->second.asString();
            else {
                // TODO: strict checking here
            }
        }

        if (!getTransportInfo(transport).connectorFactory) {
            QPID_LOG(error, "Transport '" << transport << "' not supported.");
            throw UnsupportedTransport(transport);
        }

        std::pair<boost::shared_ptr<Link>, bool> rc;
        rc = links.declare(name, host, port, transport, durable, authMech, username, password);
        if (!rc.first) {
            QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
                      "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
            throw InvalidParameter(name);
        }
        if (!rc.second) {
            QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
            throw ObjectAlreadyExists(name);
        }

    } else if (type == TYPE_BRIDGE) {

        QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );

        if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
            QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
            throw ReservedObjectName(name);
        }

        std::string linkName;
        std::string src;
        std::string dest;
        std::string key;
        std::string id;
        std::string excludes;
        std::string queueName;
        bool durable = false;
        bool srcIsQueue = false;
        bool srcIsLocal = false;
        bool dynamic = false;
        uint16_t sync = 0;
        uint32_t credit = LinkRegistry::INFINITE_CREDIT;

        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {

            if (i->first == LINK) linkName = i->second.asString();
            else if (i->first == SRC) src = i->second.asString();
            else if (i->first == DEST) dest = i->second.asString();
            else if (i->first == KEY) key = i->second.asString();
            else if (i->first == TAG) id = i->second.asString();
            else if (i->first == EXCLUDES) excludes = i->second.asString();
            else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
            else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
            else if (i->first == DYNAMIC) dynamic = bool(i->second);
            else if (i->first == SYNC) sync = i->second.asUint16();
            else if (i->first == CREDIT) credit = i->second.asUint32();
            else if (i->first == DURABLE) durable = bool(i->second);
            else if (i->first == QUEUE_NAME) queueName = i->second.asString();
            else {
                // TODO: strict checking here
            }
        }

        boost::shared_ptr<Link> link;
        if (linkName.empty() || !(link = links.getLink(linkName))) {
            QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
            throw InvalidParameter(name);
        }
        std::pair<Bridge::shared_ptr, bool> rc =
          links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
                        dynamic, sync, credit,
                        0,
                        queueName);

        if (!rc.first) {
            QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
                      "; src=" << src << "; dest=" << dest << "; key=" << key);
            throw InvalidParameter(name);
        }
        if (!rc.second) {
            QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
            throw ObjectAlreadyExists(name);
        }
    } else {
        throw UnknownObjectType(type);
    }
}