bool Queue::getNextMessage()

in src/qpid/broker/Queue.cpp [430:507]


bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
    if (!checkNotDeleted(c)) return false;
    QueueListeners::NotificationSet set;
    ScopedAutoDelete autodelete(*this);
    bool messageFound(false);
    while (true) {
        //TODO: reduce lock scope
        Mutex::ScopedLock locker(messageLock);
        QueueCursor cursor = c->getCursor(); // Save current position.
        Message* msg = messages->next(*c);   // Advances c.
        if (msg) {
            if (isExpired(name, *msg,  sys::AbsTime::now())) {
                QPID_LOG(debug, "Message expired from queue '" << name << "'");
                observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
                //ERROR: don't hold lock across call to store!!
                if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
                if (mgmtObject) {
                    mgmtObject->inc_discardsTtl();
                    if (brokerMgmtObject)
                        brokerMgmtObject->inc_discardsTtl();
                }
                messages->deleted(*c);
                continue;
            }

            if (c->filter(*msg)) {
                if (c->accept(*msg)) {
                    if (c->preAcquires()) {
                        QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence()
                                 << " from '" << name << "' with state " << msg->getState());
                        if (allocator->acquire(c->getName(), *msg)) {
                            if (mgmtObject) {
                                mgmtObject->inc_acquires();
                                if (brokerMgmtObject)
                                    brokerMgmtObject->inc_acquires();
                            }
                            observeAcquire(*msg, locker);
                            msg->deliver();
                        } else {
                            QPID_LOG(debug, "Could not acquire message from '" << name << "'");
                            continue; //try another message
                        }
                    }
                    QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '"
                             << name << "'");
                    m = *msg;
                    messageFound = true;
                    break;
                } else {
                    //message(s) are available but consumer hasn't got enough credit
                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
                    c->setCursor(cursor); // Restore cursor, will try again with credit
                    if (c->preAcquires()) {
                        //let someone else try
                        listeners.populate(set);
                    }
                    break;
                }
            } else {
                //consumer will never want this message, try another one
                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
                if (c->preAcquires()) {
                    //let someone else try to take this one
                    listeners.populate(set);
                }
            }
        } else {
            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
            c->stopped();
            listeners.addListener(c);
            break;
        }

    }
    set.notify();
    return messageFound;
}