in src/qpid/broker/Queue.cpp [430:507]
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
ScopedAutoDelete autodelete(*this);
bool messageFound(false);
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
if (isExpired(name, *msg, sys::AbsTime::now())) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
mgmtObject->inc_discardsTtl();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsTtl();
}
messages->deleted(*c);
continue;
}
if (c->filter(*msg)) {
if (c->accept(*msg)) {
if (c->preAcquires()) {
QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence()
<< " from '" << name << "' with state " << msg->getState());
if (allocator->acquire(c->getName(), *msg)) {
if (mgmtObject) {
mgmtObject->inc_acquires();
if (brokerMgmtObject)
brokerMgmtObject->inc_acquires();
}
observeAcquire(*msg, locker);
msg->deliver();
} else {
QPID_LOG(debug, "Could not acquire message from '" << name << "'");
continue; //try another message
}
}
QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '"
<< name << "'");
m = *msg;
messageFound = true;
break;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
c->setCursor(cursor); // Restore cursor, will try again with credit
if (c->preAcquires()) {
//let someone else try
listeners.populate(set);
}
break;
}
} else {
//consumer will never want this message, try another one
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
if (c->preAcquires()) {
//let someone else try to take this one
listeners.populate(set);
}
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
c->stopped();
listeners.addListener(c);
break;
}
}
set.notify();
return messageFound;
}