Poller::Event Poller::wait()

in src/qpid/sys/solaris/ECFPoller.cpp [336:433]


Poller::Event Poller::wait(Duration timeout) {
    timespec_t tout;
    timespec_t* ptout = NULL;
    port_event_t pe;

    AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE :
        AbsTime(now(), timeout);
    
    if (timeout != TIME_INFINITE) {
      tout.tv_sec = 0;
      tout.tv_nsec = timeout;
      ptout = &tout;
    }

    do {
        PollerHandleDeletionManager.markAllUnusedInThisThread();
        QPID_LOG(trace, "About to enter port_get on " << impl->portId
                 << ". Thread " << pthread_self()
                 << ", timeout=" << timeout);


        int rc = ::port_get(impl->portId, &pe, ptout);

        QPID_LOG(trace, "port_get on " << impl->portId
                 << " returned " << rc);
        
        if (impl->isShutdown) {
            PollerHandleDeletionManager.markAllUnusedInThisThread();
            return Event(0, SHUTDOWN);
        }

        if (rc < 0) {
            switch (errno) {
            case EINTR:
                continue;
            case ETIME:
                return Event(0, TIMEOUT);
            default:
                QPID_POSIX_CHECK(rc);
            }
        } else {
            PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user);
            PollerHandlePrivate& eh = *handle->impl;
            ScopedLock<Mutex> l(eh.lock);

            if (eh.isActive()) {
                QPID_LOG(trace, "Handle is active");
                //We use alert mode to notify interrupts
                if (pe.portev_source == PORT_SOURCE_ALERT &&
                    handle == &impl->interruptHandle) {
                    QPID_LOG(trace, "Interrupt notified");
                    
                    PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();

                    if (impl->interruptHandle.queuedHandles()) {
                        impl->interrupt();
                        eh.setActive();
                    } else {
                        eh.setInactive();
                    }
                    return Event(wrappedHandle, INTERRUPTED);
                }
                
                if (pe.portev_source == PORT_SOURCE_FD) {
                    QPID_LOG(trace, "About to send handle: " << handle);
                    if (pe.portev_events & POLLHUP) {
                        if (eh.isHungup()) {
                            return Event(handle, DISCONNECTED);
                        }
                        eh.setHungup();
                    } else {
                        eh.setInactive();
                    }
                    QPID_LOG(trace, "Sending event (thread: "
                             << pthread_self() << ") for handle " << handle
                             << ", direction= "
                             << PollerPrivate::pollToDirection(pe.portev_events));
                    return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
                }
            } else if (eh.isDeleted()) {
                //Remove the handle from the poller
                int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD,
                                           (uintptr_t) eh.fd);
                if (rc == -1 && errno != EBADFD) {
                    QPID_POSIX_CHECK(rc);
                }
            }
        }

        if (timeout == TIME_INFINITE) {
            continue;
        }
        if (rc == 0 && now() > targetTimeout) {
            PollerHandleDeletionManager.markAllUnusedInThisThread();
            return Event(0, TIMEOUT);
        }
    } while (true);
}