in src/qpid/sys/posix/PosixPoller.cpp [363:487]
Event getEvent(AbsTime targetTimeout) {
bool timeoutPending = false;
ScopedLock<Mutex> l(streamLock); // hold lock except for poll()
// loop until poll event, async interrupt, or timeout
while (true) {
// first check for any interrupts
while (interruptedHandles.size() > 0) {
PollerHandlePrivate& eh = *interruptedHandles.front();
interruptedHandles.pop();
{
ScopedLock<Mutex> lk(eh.lock);
if (!eh.isDeleted()) {
if (!eh.isIdle()) {
eh.setInactive();
}
// nullify the corresponding pollfd event, if any
int ehfd = eh.fd();
std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front
for (; i != pollfds.end(); i++) {
if (i->fd == ehfd) {
i->events = 0;
if (i->revents) {
i->revents = 0;
pollCount--;
}
break;
}
}
return Event(eh.pollerHandle, Poller::INTERRUPTED);
}
}
PollerHandleDeletionManager.markForDeletion(&eh);
}
// Check for shutdown
if (pollerPrivate.isShutdown) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, Poller::SHUTDOWN);
}
// search for any remaining events from earlier poll()
int nfds = pollfds.size();
while ((pollCount > 0) && (currentPollfd < nfds)) {
int index = currentPollfd++;
short evt = pollfds[index].revents;
if (evt != 0) {
pollCount--;
PollerHandlePrivate& eh = *pollHandles[index];
ScopedLock<Mutex> l(eh.lock);
// stop polling this handle until resetMode()
pollfds[index].events = 0;
// the handle could have gone inactive since snapshot taken
if (eh.isActive()) {
PollerHandle* handle = eh.pollerHandle;
assert(handle);
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
if (evt & POLLHUP) {
if (eh.isHungup()) {
eh.setInactive();
// Don't set up last Handle so that we don't reset this handle
// on re-entering Poller::wait. This means that we will never
// be set active again once we've returned disconnected, and so
// can never be returned again.
return Event(handle, Poller::DISCONNECTED);
}
eh.setHungup();
} else {
eh.setInactive();
}
return Event(handle, PollerPrivate::epollToDirection(evt));
}
}
}
if (timeoutPending) {
return Event(0, Poller::TIMEOUT);
}
// no outstanding events, poll() for more
{
ScopedUnlock<Mutex> ul(streamLock);
bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
if (refreshed) {
// we just drained all interruptedHandles and got a fresh snapshot
PollerHandleDeletionManager.markAllUnusedInThisThread();
}
if (!signalPipe.isSet()) {
int timeoutMs = -1;
if (!(targetTimeout == FAR_FUTURE)) {
timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC;
if (timeoutMs < 0)
timeoutMs = 0;
}
pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs);
if (pollCount ==-1 && errno != EINTR) {
QPID_POSIX_CHECK(pollCount);
}
else if (pollCount == 0) {
// timeout, unless shutdown or interrupt arrives in another thread
timeoutPending = true;
}
else {
if (pollfds[0].revents) {
pollCount--; // signal pipe doesn't count
}
}
}
else
pollCount = 0;
signalPipe.reset();
}
currentPollfd = 1;
}
}