public void run()

in java/org/apache/tomcat/util/net/AprEndpoint.java [1260:1537]


        public void run() {

            SocketList localAddList = new SocketList(getMaxConnections());
            SocketList localCloseList = new SocketList(getMaxConnections());

            // Loop until we receive a shutdown command
            while (pollerRunning) {

                // Check timeouts if the poller is empty.
                while (pollerRunning && connectionCount.get() < 1 &&
                        addList.size() < 1 && closeList.size() < 1) {
                    try {
                        if (getConnectionTimeout() > 0 && pollerRunning) {
                            maintain();
                        }
                        synchronized (this) {
                            // Make sure that no sockets have been placed in the
                            // addList or closeList since the check above.
                            // Without this check there could be a 10s pause
                            // with no processing since the notify() call in
                            // add()/close() would have no effect since it
                            // happened before this sync block was entered
                            if (pollerRunning && addList.size() < 1 && closeList.size() < 1) {
                                this.wait(10000);
                            }
                        }
                    } catch (InterruptedException e) {
                        // Ignore
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        getLog().warn(sm.getString("endpoint.timeout.err"));
                    }
                }

                // Don't add or poll if the poller has been stopped
                if (!pollerRunning) {
                    break;
                }

                try {
                    // Duplicate the add and remove lists so that the syncs are
                    // minimised
                    synchronized (this) {
                        if (closeList.size() > 0) {
                            // Duplicate to another list, so that the syncing is
                            // minimal
                            closeList.duplicate(localCloseList);
                            closeList.clear();
                        } else {
                            localCloseList.clear();
                        }
                    }
                    synchronized (this) {
                        if (addList.size() > 0) {
                            // Duplicate to another list, so that the syncing is
                            // minimal
                            addList.duplicate(localAddList);
                            addList.clear();
                        } else {
                            localAddList.clear();
                        }
                    }

                    // Remove sockets
                    if (localCloseList.size() > 0) {
                        SocketInfo info = localCloseList.get();
                        while (info != null) {
                            localAddList.remove(info.socket);
                            removeFromPoller(info.socket);
                            closeSocketInternal(info.socket);
                            destroySocketInternal(info.socket);
                            info = localCloseList.get();
                        }
                    }

                    // Add sockets which are waiting to the poller
                    if (localAddList.size() > 0) {
                        SocketInfo info = localAddList.get();
                        while (info != null) {
                            if (log.isDebugEnabled()) {
                                log.debug(sm.getString(
                                        "endpoint.debug.pollerAddDo",
                                        Long.valueOf(info.socket)));
                            }
                            timeouts.remove(info.socket);
                            AprSocketWrapper wrapper =
                                    (AprSocketWrapper) connections.get(Long.valueOf(info.socket));
                            if (wrapper != null) {
                                if (info.read() || info.write()) {
                                    wrapper.pollerFlags = wrapper.pollerFlags |
                                            (info.read() ? Poll.APR_POLLIN : 0) |
                                            (info.write() ? Poll.APR_POLLOUT : 0);
                                    // A socket can only be added to the poller
                                    // once. Adding it twice will return an error
                                    // which will close the socket. Therefore make
                                    // sure the socket we are about to add isn't in
                                    // the poller.
                                    removeFromPoller(info.socket);
                                    if (!addToPoller(info.socket, wrapper.pollerFlags)) {
                                        wrapper.close();
                                    } else {
                                        timeouts.add(info.socket,
                                                System.currentTimeMillis() +
                                                        info.timeout);
                                    }
                                } else {
                                    // Should never happen.
                                    wrapper.close();
                                    getLog().warn(sm.getString(
                                            "endpoint.apr.pollAddInvalid", info));
                                }
                            }
                            info = localAddList.get();
                        }
                    }

                    // Flag to ask to reallocate the pool
                    boolean reset = false;

                    int rv = Poll.poll(aprPoller, pollTime, desc, true);
                    if (rv > 0) {
                        rv = mergeDescriptors(desc, rv);
                        connectionCount.addAndGet(-rv);
                        for (int n = 0; n < rv; n++) {
                            if (getLog().isDebugEnabled()) {
                                log.debug(sm.getString(
                                        "endpoint.debug.pollerProcess",
                                        Long.valueOf(desc[n*2+1]),
                                        Long.valueOf(desc[n*2])));
                            }
                            long timeout = timeouts.remove(desc[n*2+1]);
                            AprSocketWrapper wrapper = (AprSocketWrapper)
                                    connections.get(Long.valueOf(desc[n*2+1]));
                            if (wrapper == null) {
                                // Socket was closed in another thread while still in
                                // the Poller but wasn't removed from the Poller before
                                // new data arrived.
                                continue;
                            }
                            wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
                            // Check for failed sockets and hand this socket off to a worker
                            if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
                                    || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
                                // Need to trigger error handling. Poller may return error
                                // codes plus the flags it was waiting for or it may just
                                // return an error code. We could handle the error here but
                                // if we do, there will be no exception associated with the
                                // error in application code. By signaling read/write is
                                // possible, a read/write will be attempted, fail and that
                                // will trigger an exception the application will see.
                                // Check the return flags first, followed by what the socket
                                // was registered for
                                if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
                                    // Error probably occurred during a non-blocking read
                                    if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                        // Close socket and clear pool
                                        wrapper.close();
                                    }
                                } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                    // Error probably occurred during a non-blocking write
                                    if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                        // Close socket and clear pool
                                        wrapper.close();
                                    }
                                } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
                                    // Can't tell what was happening when the error occurred but the
                                    // socket is registered for non-blocking read so use that
                                    if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                        // Close socket and clear pool
                                        wrapper.close();
                                    }
                                } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                    // Can't tell what was happening when the error occurred but the
                                    // socket is registered for non-blocking write so use that
                                    if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                        // Close socket and clear pool
                                        wrapper.close();
                                    }
                                } else {
                                    // Close socket and clear pool
                                    wrapper.close();
                                }
                            } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
                                    || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
                                boolean error = false;
                                if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) &&
                                        !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                    error = true;
                                    // Close socket and clear pool
                                    wrapper.close();
                                }
                                if (!error &&
                                        ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
                                        !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                    // Close socket and clear pool
                                    error = true;
                                    wrapper.close();
                                }
                                if (!error && wrapper.pollerFlags != 0) {
                                    // If socket was registered for multiple events but
                                    // only some of the occurred, re-register for the
                                    // remaining events.
                                    // timeout is the value of System.currentTimeMillis() that
                                    // was set as the point that the socket will timeout. When
                                    // adding to the poller, the timeout from now in
                                    // milliseconds is required.
                                    // So first, subtract the current timestamp
                                    if (timeout > 0) {
                                        timeout = timeout - System.currentTimeMillis();
                                    }
                                    // If the socket should have already expired by now,
                                    // re-add it with a very short timeout
                                    if (timeout <= 0) {
                                        timeout = 1;
                                    }
                                    // Should be impossible but just in case since timeout will
                                    // be cast to an int.
                                    if (timeout > Integer.MAX_VALUE) {
                                        timeout = Integer.MAX_VALUE;
                                    }
                                    add(desc[n*2+1], (int) timeout, wrapper.pollerFlags);
                                }
                            } else {
                                // Unknown event
                                getLog().warn(sm.getString(
                                        "endpoint.apr.pollUnknownEvent",
                                        Long.valueOf(desc[n*2])));
                                // Close socket and clear pool
                                wrapper.close();
                            }
                        }
                    } else if (rv < 0) {
                        int errn = -rv;
                        // Any non timeup or interrupted error is critical
                        if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
                            if (errn >  Status.APR_OS_START_USERERR) {
                                errn -=  Status.APR_OS_START_USERERR;
                            }
                            getLog().error(sm.getString(
                                    "endpoint.apr.pollError",
                                    Integer.valueOf(errn),
                                    Error.strerror(errn)));
                            // Destroy and reallocate the poller
                            reset = true;
                        }
                    }

                    if (reset && pollerRunning) {
                        // Reallocate the current poller
                        int count = Poll.pollset(aprPoller, desc);
                        long newPoller = allocatePoller(pollerSize, pool, -1);
                        // Don't restore connections for now, since I have not tested it
                        connectionCount.addAndGet(-count);
                        Poll.destroy(aprPoller);
                        aprPoller = newPoller;
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    getLog().warn(sm.getString("endpoint.poll.error"), t);
                }
                try {
                    // Process socket timeouts
                    if (getConnectionTimeout() > 0 && pollerRunning) {
                        // This works and uses only one timeout mechanism for everything, but the
                        // non event poller might be a bit faster by using the old maintain.
                        maintain();
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    getLog().warn(sm.getString("endpoint.timeout.err"), t);
                }
            }

            synchronized (this) {
                this.notifyAll();
            }
        }