in src/qpid/sys/DispatchHandle.cpp [245:350]
void DispatchHandle::processEvent(Poller::EventType type) {
// Phase I
{
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
// Can get here if a non connection thread stops watching
// whilst we were stuck in the above lock
return;
case WAITING:
state = CALLING;
break;
case CALLING:
assert(state!=CALLING);
return;
case STOPPING:
assert(state!=STOPPING);
return;
case DELETING:
// Need to make sure we clean up any pending callbacks in this case
std::swap(callbacks, interruptedCallbacks);
goto saybyebye;
}
std::swap(callbacks, interruptedCallbacks);
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
// the same handle until we re-enable it. To avoid rentering the callbacks for a single
// handle re-enabling in the callbacks is actually deferred until they are complete.
try {
switch (type) {
case Poller::READABLE:
readableCallback(*this);
break;
case Poller::WRITABLE:
writableCallback(*this);
break;
case Poller::READ_WRITABLE:
readableCallback(*this);
writableCallback(*this);
break;
case Poller::DISCONNECTED:
if (disconnectedCallback) {
disconnectedCallback(*this);
}
break;
case Poller::INTERRUPTED:
{
// We'll actually do the interrupt below
}
break;
default:
assert(false);
}
// If we have any callbacks do them now -
// (because we use a copy from before the previous callbacks we won't
// do anything yet that was just added)
while (callbacks.size() > 0) {
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case DELETING:
goto finishcallbacks;
default:
break;
}
}
Callback cb = callbacks.front();
assert(cb);
cb(*this);
callbacks.pop();
}
} catch (std::exception& e) {
// One of the callbacks threw an exception - that's not allowed
QPID_LOG(error, "Caught exception in state: " << state << " with event: " << type << ": " << e.what());
// It would be nice to clean up and delete ourselves here, but we can't
}
finishcallbacks:
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
assert(state!=IDLE);
return;
case STOPPING:
state = IDLE;
return;
case WAITING:
assert(state!=WAITING);
return;
case CALLING:
state = WAITING;
return;
case DELETING:
break;
}
}
saybyebye:
delete this;
}