Poller::Event Poller::wait()

in src/qpid/sys/epoll/EpollPoller.cpp [549:679]


Poller::Event Poller::wait(Duration timeout) {
    static __thread PollerHandlePrivate* lastReturnedHandle = 0;
    // Make sure lighly used threads regularly purge DeletionManager memory.
    static const Duration maxEpollWait = 60 * TIME_SEC;
    epoll_event epe;
    AbsTime targetTimeout = 
        (timeout == TIME_INFINITE) ?
            FAR_FUTURE :
            AbsTime(now(), timeout); 

    if (lastReturnedHandle) {
        impl->resetMode(*lastReturnedHandle);
        lastReturnedHandle = 0;
    }

    // Repeat until we weren't interrupted by signal
    do {
        PollerHandleDeletionManager.markAllUnusedInThisThread();
        int timeoutMs;
        AbsTime now_(now());
        if (timeout == TIME_INFINITE) {
            timeoutMs = maxEpollWait / TIME_MSEC;
        } else if (now_ > targetTimeout || now_ == targetTimeout) {
                timeoutMs = 0;
        } else {
            // Account for truncation when converting to millisecs.
            Duration remaining(now_, AbsTime(targetTimeout, TIME_MSEC - 1));
            timeoutMs = std::min(remaining, maxEpollWait) / TIME_MSEC;
        }

        int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
        if (rc ==-1 && errno != EINTR) {
            QPID_POSIX_CHECK(rc);
        } else if (rc > 0) {
            assert(rc == 1);
            void* dataPtr = epe.data.ptr;

            // Check if this is an interrupt
            PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
            if (dataPtr == &interruptHandle) {
                // If we are shutting down we need to rearm the shutdown interrupt to
                // ensure everyone still sees it. It's okay that this might be overridden
                // below as we will be back here if it is.
                if (impl->isShutdown) {
                    impl->interruptAll();
                }
                PollerHandle* wrappedHandle = 0;
                {
                ScopedLock<Mutex> l(interruptHandle.impl->lock);
                if (interruptHandle.impl->isActive()) {
                    wrappedHandle = interruptHandle.getHandle();
                    // If there is an interrupt queued behind this one we need to arm it
                    // We do it this way so that another thread can pick it up
                    if (interruptHandle.queuedHandles()) {
                        impl->interrupt();
                        interruptHandle.impl->setActive();
                    } else {
                        interruptHandle.impl->setInactive();
                    }
                }
                }
                if (wrappedHandle) {
                    PollerHandlePrivate& eh = *wrappedHandle->impl;
                    {
                    ScopedLock<Mutex> l(eh.lock);
                    if (!eh.isDeleted()) {
                        if (!eh.isIdle()) {
                            eh.setInactive();
                        }
                        lastReturnedHandle = &eh;
                        assert(eh.pollerHandle == wrappedHandle);
                        return Event(wrappedHandle, INTERRUPTED);
                    }
                    }
                    PollerHandleDeletionManager.markForDeletion(&eh);
                }
                continue;
            }

            // Check for shutdown
            if (impl->isShutdown) {
                PollerHandleDeletionManager.markAllUnusedInThisThread();
                return Event(0, SHUTDOWN);
            }

            PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
            ScopedLock<Mutex> l(eh.lock);

            // the handle could have gone inactive since we left the epoll_wait
            if (eh.isActive()) {
                PollerHandle* handle = eh.pollerHandle;
                assert(handle);

                // If the connection has been hungup we could still be readable
                // (just not writable), allow us to readable until we get here again
                if (epe.events & ::EPOLLHUP) {
                    if (eh.isHungup()) {
                        eh.setInactive();
                        // Don't set up last Handle so that we don't reset this handle
                        // on re-entering Poller::wait. This means that we will never
                        // be set active again once we've returned disconnected, and so
                        // can never be returned again.
                        return Event(handle, DISCONNECTED);
                    }
                    eh.setHungup();
                } else {
                    eh.setInactive();
                }
                lastReturnedHandle = &eh;
                return Event(handle, PollerPrivate::epollToDirection(epe.events));
            }
        }
        // We only get here if one of the following:
        // * epoll_wait was interrupted by a signal
        // * epoll_wait timed out
        // * the state of the handle changed after being returned by epoll_wait
        //
        // The only things we can do here are return a timeout or wait more.
        // Obviously if we timed out we return timeout; if the wait was meant to
        // be indefinite then we should never return with a time out so we go again.
        // If the wait wasn't indefinite, we check whether we are after the target wait
        // time or not
        if (timeout == TIME_INFINITE) {
            continue;
        }
        if (rc == 0 && now() > targetTimeout) {
            PollerHandleDeletionManager.markAllUnusedInThisThread();
            return Event(0, TIMEOUT);
        }
    } while (true);
}