public void run()

in hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java [187:328]


        public void run() {
            try {
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (ClosedChannelException e) {
                throw new RuntimeException(e);
            }
            BitSet unsentMessagesBitmap = new BitSet();
            List<Message> tempUnsentMessages = new ArrayList<Message>();
            while (!stopped) {
                try {
                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.fine("Starting Select");
                    }
                    int n = selector.select();
                    collectOutstandingWork();
                    if (!workingPendingConnections.isEmpty()) {
                        for (IPCHandle handle : workingPendingConnections) {
                            SocketChannel channel = SocketChannel.open();
                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                            channel.configureBlocking(false);
                            SelectionKey cKey = null;
                            if (channel.connect(handle.getRemoteAddress())) {
                                cKey = channel.register(selector, SelectionKey.OP_READ);
                                handle.setState(HandleState.CONNECT_SENT);
                                write(createInitialReqMessage(handle));
                            } else {
                                cKey = channel.register(selector, SelectionKey.OP_CONNECT);
                            }
                            handle.setKey(cKey);
                            cKey.attach(handle);
                        }
                        workingPendingConnections.clear();
                    }
                    if (!workingSendList.isEmpty()) {
                        unsentMessagesBitmap.clear();
                        int len = workingSendList.size();
                        for (int i = 0; i < len; ++i) {
                            Message msg = workingSendList.get(i);
                            if (LOGGER.isLoggable(Level.FINE)) {
                                LOGGER.fine("Processing send of message: " + msg);
                            }
                            IPCHandle handle = msg.getIPCHandle();
                            if (handle.getState() != HandleState.CLOSED) {
                                if (!handle.full()) {
                                    while (true) {
                                        ByteBuffer buffer = handle.getOutBuffer();
                                        buffer.compact();
                                        boolean success = msg.write(buffer);
                                        buffer.flip();
                                        if (success) {
                                            system.getPerformanceCounters().addMessageSentCount(1);
                                            SelectionKey key = handle.getKey();
                                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                        } else {
                                            if (!buffer.hasRemaining()) {
                                                handle.resizeOutBuffer();
                                                continue;
                                            }
                                            handle.markFull();
                                            unsentMessagesBitmap.set(i);
                                        }
                                        break;
                                    }
                                } else {
                                    unsentMessagesBitmap.set(i);
                                }
                            }
                        }
                        copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
                    }
                    if (n > 0) {
                        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
                            SelectionKey key = i.next();
                            i.remove();
                            SelectableChannel sc = key.channel();
                            if (key.isReadable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                IPCHandle handle = (IPCHandle) key.attachment();
                                ByteBuffer readBuffer = handle.getInBuffer();
                                int len = channel.read(readBuffer);
                                system.getPerformanceCounters().addMessageBytesReceived(len);
                                if (len < 0) {
                                    key.cancel();
                                    channel.close();
                                    handle.close();
                                } else {
                                    handle.processIncomingMessages();
                                    if (!readBuffer.hasRemaining()) {
                                        handle.resizeInBuffer();
                                    }
                                }
                            } else if (key.isWritable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                IPCHandle handle = (IPCHandle) key.attachment();
                                ByteBuffer writeBuffer = handle.getOutBuffer();
                                int len = channel.write(writeBuffer);
                                system.getPerformanceCounters().addMessageBytesSent(len);
                                if (len < 0) {
                                    key.cancel();
                                    channel.close();
                                    handle.close();
                                } else if (!writeBuffer.hasRemaining()) {
                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                }
                                if (handle.full()) {
                                    handle.clearFull();
                                    selector.wakeup();
                                }
                            } else if (key.isAcceptable()) {
                                assert sc == serverSocketChannel;
                                SocketChannel channel = serverSocketChannel.accept();
                                channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                                channel.configureBlocking(false);
                                IPCHandle handle = new IPCHandle(system, null);
                                SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
                                handle.setKey(cKey);
                                cKey.attach(handle);
                                handle.setState(HandleState.CONNECT_RECEIVED);
                            } else if (key.isConnectable()) {
                                SocketChannel channel = (SocketChannel) sc;
                                IPCHandle handle = (IPCHandle) key.attachment();
                                try {
                                    if (!channel.finishConnect()) {
                                        throw new Exception("Connection did not finish");
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    handle.setState(HandleState.CONNECT_FAILED);
                                    continue;
                                }
                                handle.setState(HandleState.CONNECT_SENT);
                                registerHandle(handle);
                                key.interestOps(SelectionKey.OP_READ);
                                write(createInitialReqMessage(handle));
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }