void DispatchHandle::processEvent()

in src/qpid/sys/DispatchHandle.cpp [245:350]


void DispatchHandle::processEvent(Poller::EventType type) {

    // Phase I
    {
    ScopedLock<Mutex> lock(stateLock);
    
    switch(state) {
    case IDLE:
        // Can get here if a non connection thread stops watching
        // whilst we were stuck in the above lock 
        return;
    case WAITING:
        state = CALLING;
        break;
    case CALLING:
        assert(state!=CALLING);
        return;
    case STOPPING:
        assert(state!=STOPPING);
        return;
    case DELETING:
        // Need to make sure we clean up any pending callbacks in this case
        std::swap(callbacks, interruptedCallbacks);
        goto saybyebye;
    }
    
    std::swap(callbacks, interruptedCallbacks);
    }

    // Do callbacks - whilst we are doing the callbacks we are prevented from processing
    // the same handle until we re-enable it. To avoid rentering the callbacks for a single
    // handle re-enabling in the callbacks is actually deferred until they are complete.
    try {
    switch (type) {
    case Poller::READABLE:
        readableCallback(*this);
        break;
    case Poller::WRITABLE:
        writableCallback(*this);
        break;
    case Poller::READ_WRITABLE:
        readableCallback(*this);
        writableCallback(*this);
        break;
    case Poller::DISCONNECTED:
        if (disconnectedCallback) {
            disconnectedCallback(*this);
        }
        break;
    case Poller::INTERRUPTED:
        {
        // We'll actually do the interrupt below
        }
        break;
    default:
        assert(false);
    }

    // If we have any callbacks do them now -
    // (because we use a copy from before the previous callbacks we won't
    //  do anything yet that was just added) 
    while (callbacks.size() > 0) {
        {
        ScopedLock<Mutex> lock(stateLock);
        switch (state) {
        case DELETING:
            goto finishcallbacks;
        default:
            break;
        }
        }
        Callback cb = callbacks.front();
        assert(cb);
        cb(*this);
        callbacks.pop();
    }
    } catch (std::exception& e) {
        // One of the callbacks threw an exception - that's not allowed
        QPID_LOG(error, "Caught exception in state: " << state << " with event: " << type << ": " << e.what());
        // It would be nice to clean up and delete ourselves here, but we can't
    }

finishcallbacks:
    {
    ScopedLock<Mutex> lock(stateLock);
    switch (state) {
    case IDLE:
        assert(state!=IDLE);
        return;
    case STOPPING:
        state = IDLE;
        return;
    case WAITING:
        assert(state!=WAITING);
        return;
    case CALLING:
        state = WAITING;
        return;
    case DELETING:
        break;
    }
    }

saybyebye:
    delete this;
}