in java/org/apache/tomcat/util/net/AprEndpoint.java [1260:1537]
public void run() {
SocketList localAddList = new SocketList(getMaxConnections());
SocketList localCloseList = new SocketList(getMaxConnections());
// Loop until we receive a shutdown command
while (pollerRunning) {
// Check timeouts if the poller is empty.
while (pollerRunning && connectionCount.get() < 1 &&
addList.size() < 1 && closeList.size() < 1) {
try {
if (getConnectionTimeout() > 0 && pollerRunning) {
maintain();
}
synchronized (this) {
// Make sure that no sockets have been placed in the
// addList or closeList since the check above.
// Without this check there could be a 10s pause
// with no processing since the notify() call in
// add()/close() would have no effect since it
// happened before this sync block was entered
if (pollerRunning && addList.size() < 1 && closeList.size() < 1) {
this.wait(10000);
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().warn(sm.getString("endpoint.timeout.err"));
}
}
// Don't add or poll if the poller has been stopped
if (!pollerRunning) {
break;
}
try {
// Duplicate the add and remove lists so that the syncs are
// minimised
synchronized (this) {
if (closeList.size() > 0) {
// Duplicate to another list, so that the syncing is
// minimal
closeList.duplicate(localCloseList);
closeList.clear();
} else {
localCloseList.clear();
}
}
synchronized (this) {
if (addList.size() > 0) {
// Duplicate to another list, so that the syncing is
// minimal
addList.duplicate(localAddList);
addList.clear();
} else {
localAddList.clear();
}
}
// Remove sockets
if (localCloseList.size() > 0) {
SocketInfo info = localCloseList.get();
while (info != null) {
localAddList.remove(info.socket);
removeFromPoller(info.socket);
closeSocketInternal(info.socket);
destroySocketInternal(info.socket);
info = localCloseList.get();
}
}
// Add sockets which are waiting to the poller
if (localAddList.size() > 0) {
SocketInfo info = localAddList.get();
while (info != null) {
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.pollerAddDo",
Long.valueOf(info.socket)));
}
timeouts.remove(info.socket);
AprSocketWrapper wrapper =
(AprSocketWrapper) connections.get(Long.valueOf(info.socket));
if (wrapper != null) {
if (info.read() || info.write()) {
wrapper.pollerFlags = wrapper.pollerFlags |
(info.read() ? Poll.APR_POLLIN : 0) |
(info.write() ? Poll.APR_POLLOUT : 0);
// A socket can only be added to the poller
// once. Adding it twice will return an error
// which will close the socket. Therefore make
// sure the socket we are about to add isn't in
// the poller.
removeFromPoller(info.socket);
if (!addToPoller(info.socket, wrapper.pollerFlags)) {
wrapper.close();
} else {
timeouts.add(info.socket,
System.currentTimeMillis() +
info.timeout);
}
} else {
// Should never happen.
wrapper.close();
getLog().warn(sm.getString(
"endpoint.apr.pollAddInvalid", info));
}
}
info = localAddList.get();
}
}
// Flag to ask to reallocate the pool
boolean reset = false;
int rv = Poll.poll(aprPoller, pollTime, desc, true);
if (rv > 0) {
rv = mergeDescriptors(desc, rv);
connectionCount.addAndGet(-rv);
for (int n = 0; n < rv; n++) {
if (getLog().isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.pollerProcess",
Long.valueOf(desc[n*2+1]),
Long.valueOf(desc[n*2])));
}
long timeout = timeouts.remove(desc[n*2+1]);
AprSocketWrapper wrapper = (AprSocketWrapper)
connections.get(Long.valueOf(desc[n*2+1]));
if (wrapper == null) {
// Socket was closed in another thread while still in
// the Poller but wasn't removed from the Poller before
// new data arrived.
continue;
}
wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
// Check for failed sockets and hand this socket off to a worker
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
|| ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
// Need to trigger error handling. Poller may return error
// codes plus the flags it was waiting for or it may just
// return an error code. We could handle the error here but
// if we do, there will be no exception associated with the
// error in application code. By signaling read/write is
// possible, a read/write will be attempted, fail and that
// will trigger an exception the application will see.
// Check the return flags first, followed by what the socket
// was registered for
if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
// Error probably occurred during a non-blocking read
if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
// Close socket and clear pool
wrapper.close();
}
} else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
// Error probably occurred during a non-blocking write
if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
// Close socket and clear pool
wrapper.close();
}
} else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
// Can't tell what was happening when the error occurred but the
// socket is registered for non-blocking read so use that
if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
// Close socket and clear pool
wrapper.close();
}
} else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
// Can't tell what was happening when the error occurred but the
// socket is registered for non-blocking write so use that
if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
// Close socket and clear pool
wrapper.close();
}
} else {
// Close socket and clear pool
wrapper.close();
}
} else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
|| ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
boolean error = false;
if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) &&
!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
error = true;
// Close socket and clear pool
wrapper.close();
}
if (!error &&
((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
// Close socket and clear pool
error = true;
wrapper.close();
}
if (!error && wrapper.pollerFlags != 0) {
// If socket was registered for multiple events but
// only some of the occurred, re-register for the
// remaining events.
// timeout is the value of System.currentTimeMillis() that
// was set as the point that the socket will timeout. When
// adding to the poller, the timeout from now in
// milliseconds is required.
// So first, subtract the current timestamp
if (timeout > 0) {
timeout = timeout - System.currentTimeMillis();
}
// If the socket should have already expired by now,
// re-add it with a very short timeout
if (timeout <= 0) {
timeout = 1;
}
// Should be impossible but just in case since timeout will
// be cast to an int.
if (timeout > Integer.MAX_VALUE) {
timeout = Integer.MAX_VALUE;
}
add(desc[n*2+1], (int) timeout, wrapper.pollerFlags);
}
} else {
// Unknown event
getLog().warn(sm.getString(
"endpoint.apr.pollUnknownEvent",
Long.valueOf(desc[n*2])));
// Close socket and clear pool
wrapper.close();
}
}
} else if (rv < 0) {
int errn = -rv;
// Any non timeup or interrupted error is critical
if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
if (errn > Status.APR_OS_START_USERERR) {
errn -= Status.APR_OS_START_USERERR;
}
getLog().error(sm.getString(
"endpoint.apr.pollError",
Integer.valueOf(errn),
Error.strerror(errn)));
// Destroy and reallocate the poller
reset = true;
}
}
if (reset && pollerRunning) {
// Reallocate the current poller
int count = Poll.pollset(aprPoller, desc);
long newPoller = allocatePoller(pollerSize, pool, -1);
// Don't restore connections for now, since I have not tested it
connectionCount.addAndGet(-count);
Poll.destroy(aprPoller);
aprPoller = newPoller;
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().warn(sm.getString("endpoint.poll.error"), t);
}
try {
// Process socket timeouts
if (getConnectionTimeout() > 0 && pollerRunning) {
// This works and uses only one timeout mechanism for everything, but the
// non event poller might be a bit faster by using the old maintain.
maintain();
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().warn(sm.getString("endpoint.timeout.err"), t);
}
}
synchronized (this) {
this.notifyAll();
}
}