in src/qpid/sys/epoll/EpollPoller.cpp [549:679]
Poller::Event Poller::wait(Duration timeout) {
static __thread PollerHandlePrivate* lastReturnedHandle = 0;
// Make sure lighly used threads regularly purge DeletionManager memory.
static const Duration maxEpollWait = 60 * TIME_SEC;
epoll_event epe;
AbsTime targetTimeout =
(timeout == TIME_INFINITE) ?
FAR_FUTURE :
AbsTime(now(), timeout);
if (lastReturnedHandle) {
impl->resetMode(*lastReturnedHandle);
lastReturnedHandle = 0;
}
// Repeat until we weren't interrupted by signal
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
int timeoutMs;
AbsTime now_(now());
if (timeout == TIME_INFINITE) {
timeoutMs = maxEpollWait / TIME_MSEC;
} else if (now_ > targetTimeout || now_ == targetTimeout) {
timeoutMs = 0;
} else {
// Account for truncation when converting to millisecs.
Duration remaining(now_, AbsTime(targetTimeout, TIME_MSEC - 1));
timeoutMs = std::min(remaining, maxEpollWait) / TIME_MSEC;
}
int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
if (rc ==-1 && errno != EINTR) {
QPID_POSIX_CHECK(rc);
} else if (rc > 0) {
assert(rc == 1);
void* dataPtr = epe.data.ptr;
// Check if this is an interrupt
PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
if (dataPtr == &interruptHandle) {
// If we are shutting down we need to rearm the shutdown interrupt to
// ensure everyone still sees it. It's okay that this might be overridden
// below as we will be back here if it is.
if (impl->isShutdown) {
impl->interruptAll();
}
PollerHandle* wrappedHandle = 0;
{
ScopedLock<Mutex> l(interruptHandle.impl->lock);
if (interruptHandle.impl->isActive()) {
wrappedHandle = interruptHandle.getHandle();
// If there is an interrupt queued behind this one we need to arm it
// We do it this way so that another thread can pick it up
if (interruptHandle.queuedHandles()) {
impl->interrupt();
interruptHandle.impl->setActive();
} else {
interruptHandle.impl->setInactive();
}
}
}
if (wrappedHandle) {
PollerHandlePrivate& eh = *wrappedHandle->impl;
{
ScopedLock<Mutex> l(eh.lock);
if (!eh.isDeleted()) {
if (!eh.isIdle()) {
eh.setInactive();
}
lastReturnedHandle = &eh;
assert(eh.pollerHandle == wrappedHandle);
return Event(wrappedHandle, INTERRUPTED);
}
}
PollerHandleDeletionManager.markForDeletion(&eh);
}
continue;
}
// Check for shutdown
if (impl->isShutdown) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, SHUTDOWN);
}
PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
ScopedLock<Mutex> l(eh.lock);
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
PollerHandle* handle = eh.pollerHandle;
assert(handle);
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
if (epe.events & ::EPOLLHUP) {
if (eh.isHungup()) {
eh.setInactive();
// Don't set up last Handle so that we don't reset this handle
// on re-entering Poller::wait. This means that we will never
// be set active again once we've returned disconnected, and so
// can never be returned again.
return Event(handle, DISCONNECTED);
}
eh.setHungup();
} else {
eh.setInactive();
}
lastReturnedHandle = &eh;
return Event(handle, PollerPrivate::epollToDirection(epe.events));
}
}
// We only get here if one of the following:
// * epoll_wait was interrupted by a signal
// * epoll_wait timed out
// * the state of the handle changed after being returned by epoll_wait
//
// The only things we can do here are return a timeout or wait more.
// Obviously if we timed out we return timeout; if the wait was meant to
// be indefinite then we should never return with a time out so we go again.
// If the wait wasn't indefinite, we check whether we are after the target wait
// time or not
if (timeout == TIME_INFINITE) {
continue;
}
if (rc == 0 && now() > targetTimeout) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, TIMEOUT);
}
} while (true);
}