public void run()

in hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java [118:208]


        public void run() {
            while (true) {
                try {
                    int n = selector.select();
                    collectOutstandingWork();
                    if (!workingPendingConnections.isEmpty()) {
                        for (InetSocketAddress address : workingPendingConnections) {
                            SocketChannel channel = SocketChannel.open();
                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                            channel.configureBlocking(false);
                            boolean connect = false;
                            boolean failure = false;
                            try {
                                connect = channel.connect(address);
                            } catch (IOException e) {
                                failure = true;
                                synchronized (connectionListener) {
                                    connectionListener.connectionFailure(address);
                                }
                            }
                            if (!failure) {
                                if (!connect) {
                                    SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
                                    key.attach(address);
                                } else {
                                    SelectionKey key = channel.register(selector, 0);
                                    createConnection(key, channel);
                                }
                            }
                        }
                        workingPendingConnections.clear();
                    }
                    if (!workingIncomingConnections.isEmpty()) {
                        for (SocketChannel channel : workingIncomingConnections) {
                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                            channel.configureBlocking(false);
                            SelectionKey sKey = channel.register(selector, 0);
                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
                            sKey.attach(connection);
                            synchronized (connectionListener) {
                                connectionListener.acceptedConnection(connection);
                            }
                        }
                        workingIncomingConnections.clear();
                    }
                    if (n > 0) {
                        Iterator<SelectionKey> i = selector.selectedKeys().iterator();
                        while (i.hasNext()) {
                            SelectionKey key = i.next();
                            i.remove();
                            SelectableChannel sc = key.channel();
                            boolean readable = key.isReadable();
                            boolean writable = key.isWritable();

                            if (readable || writable) {
                                TCPConnection connection = (TCPConnection) key.attachment();
                                try {
                                    connection.getEventListener().notifyIOReady(connection, readable, writable);
                                } catch (Exception e) {
                                    connection.getEventListener().notifyIOError(e);
                                    connection.close();
                                    continue;
                                }
                            }
                            if (key.isAcceptable()) {
                                assert sc == serverSocketChannel;
                                SocketChannel channel = serverSocketChannel.accept();
                                distributeIncomingConnection(channel);
                            } else if (key.isConnectable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                boolean finishConnect = false;
                                try {
                                    finishConnect = channel.finishConnect();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    key.cancel();
                                    synchronized (connectionListener) {
                                        connectionListener.connectionFailure((InetSocketAddress) key.attachment());
                                    }
                                }
                                if (finishConnect) {
                                    createConnection(key, channel);
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }