Event getEvent()

in src/qpid/sys/posix/PosixPoller.cpp [363:487]


        Event getEvent(AbsTime targetTimeout) {
            bool timeoutPending = false;

            ScopedLock<Mutex> l(streamLock); // hold lock except for poll()

            // loop until poll event, async interrupt, or timeout
            while (true) {

                // first check for any interrupts
                while (interruptedHandles.size() > 0) {
                    PollerHandlePrivate& eh = *interruptedHandles.front();
                    interruptedHandles.pop();
                    {
                        ScopedLock<Mutex> lk(eh.lock);
                        if (!eh.isDeleted()) {
                            if (!eh.isIdle()) {
                                eh.setInactive();
                            }

                            // nullify the corresponding pollfd event, if any
                            int ehfd = eh.fd();
                            std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front
                            for (; i != pollfds.end(); i++) {
                                if (i->fd == ehfd) {
                                    i->events = 0;
                                    if (i->revents) {
                                        i->revents = 0;
                                        pollCount--;
                                    }
                                    break;
                                }
                            }
                            return Event(eh.pollerHandle, Poller::INTERRUPTED);
                        }
                    }
                    PollerHandleDeletionManager.markForDeletion(&eh);
                }

                // Check for shutdown
                if (pollerPrivate.isShutdown) {
                    PollerHandleDeletionManager.markAllUnusedInThisThread();
                    return Event(0, Poller::SHUTDOWN);
                }

                // search for any remaining events from earlier poll()
                int nfds = pollfds.size();
                while ((pollCount > 0) && (currentPollfd < nfds)) {
                    int index = currentPollfd++;
                    short evt = pollfds[index].revents;
                    if (evt != 0) {
                        pollCount--;
                        PollerHandlePrivate& eh = *pollHandles[index];
                        ScopedLock<Mutex> l(eh.lock);
                        // stop polling this handle until resetMode()
                        pollfds[index].events = 0;

                        // the handle could have gone inactive since snapshot taken
                        if (eh.isActive()) {
                            PollerHandle* handle = eh.pollerHandle;
                            assert(handle);

                            // If the connection has been hungup we could still be readable
                            // (just not writable), allow us to readable until we get here again
                            if (evt & POLLHUP) {
                                if (eh.isHungup()) {
                                    eh.setInactive();
                                    // Don't set up last Handle so that we don't reset this handle
                                    // on re-entering Poller::wait. This means that we will never
                                    // be set active again once we've returned disconnected, and so
                                    // can never be returned again.
                                    return Event(handle, Poller::DISCONNECTED);
                                }
                                eh.setHungup();
                            } else {
                                eh.setInactive();
                            }
                            return Event(handle, PollerPrivate::epollToDirection(evt));
                        }
                    }
                }

                if (timeoutPending) {
                    return Event(0, Poller::TIMEOUT);
                }

                // no outstanding events, poll() for more
                {
                    ScopedUnlock<Mutex> ul(streamLock);

                    bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
                    if (refreshed) {
                        // we just drained all interruptedHandles and got a fresh snapshot
                        PollerHandleDeletionManager.markAllUnusedInThisThread();
                    }

                    if (!signalPipe.isSet()) {
                        int timeoutMs = -1;
                        if (!(targetTimeout == FAR_FUTURE)) {
                            timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC;
                            if (timeoutMs < 0)
                                timeoutMs = 0;
                        }

                        pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs);

                        if (pollCount ==-1 && errno != EINTR) {
                            QPID_POSIX_CHECK(pollCount);
                        }
                        else if (pollCount == 0) {
                            // timeout, unless shutdown or interrupt arrives in another thread
                            timeoutPending = true;
                        }
                        else {
                            if (pollfds[0].revents) {
                                pollCount--; // signal pipe doesn't count
                            }
                        }
                    }
                    else
                        pollCount = 0;
                    signalPipe.reset();
                }
                currentPollfd = 1;
            }
        }