boost::shared_ptr QueueFactory::create()

in src/qpid/broker/QueueFactory.cpp [49:119]


boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
{
    settings.validate();
    boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings));

    //1. determine Queue type (i.e. whether we are subclassing Queue)
    boost::shared_ptr<Queue> queue;
    if (settings.dropMessagesAtLimit) {
        // -> if 'ring' policy is in use then subclass
        if (settings.lvqKey.size()) {
            //combination of ring and lvq:
            std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
            queue = boost::shared_ptr<Queue>(new LossyLvq(name, map, settings, settings.durable ? store : 0, parent, broker));
        } else {
            //simple ring:
            queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
        }
    } else if (settings.selfDestructAtLimit) {
        queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker));
    } else if (settings.lvqKey.size()) {
        std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
        queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
    } else {
        queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker));
    }

    //2. determine Messages type (i.e. structure)
    if (settings.priorities) {
        if (settings.defaultFairshare || settings.fairshare.size()) {
            queue->messages = Fairshare::create(settings);
        } else {
            queue->messages = std::auto_ptr<Messages>(new PriorityQueue(settings.priorities));
        }
    } else if (settings.paging) {
        if (!broker) {
            QPID_LOG(warning, "Cannot create paged queue without broker context");
        } else if (!qpid::sys::MemoryMappedFile::isSupported()) {
            QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform");
        } else if ( !broker->getPagingDir().isEnabled() ) {
            QPID_LOG(warning, "Cannot create paged queue; no paging directory enabled");
        } else {
            queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDir().getPath(),
                                                                     settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES,
                                                                     settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR,
                                                                     broker->getProtocolRegistry()));
        }
    } else if (settings.lvqKey.empty()) {//LVQ already handled above
        queue->messages = std::auto_ptr<Messages>(new MessageDeque());
    }

    //3. determine MessageDistributor type
    if (settings.groupKey.size()) {
        boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings));
        queue->allocator = mgm;
        queue->getObservers().add(mgm);
    } else {
        queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) ));
    }


    //4. threshold event config
    if (broker && broker->getManagementAgent()) {
        ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getQueueThresholdEventRatio());
    }
    //5. flow control config
    if (flow_ptr) {
	flow_ptr->observe(*queue);
    }

    return queue;
}