private void bodyInternal()

in modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java [2042:2331]


        private void bodyInternal() throws IgniteCheckedException, InterruptedException {
            try {
                long lastIdleCheck = U.currentTimeMillis();

                mainLoop:
                while (!closed && selector.isOpen()) {
                    SessionChangeRequest req0;

                    updateHeartbeat();

                    while ((req0 = changeReqs.poll()) != null) {
                        updateHeartbeat();

                        switch (req0.operation()) {
                            case CONNECT: {
                                NioOperationFuture fut = (NioOperationFuture)req0;

                                SocketChannel ch = fut.socketChannel();

                                try {
                                    ch.register(selector, SelectionKey.OP_CONNECT, fut);
                                }
                                catch (IOException e) {
                                    fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
                                }

                                break;
                            }

                            case CANCEL_CONNECT: {
                                NioOperationFuture req = (NioOperationFuture)req0;

                                SocketChannel ch = req.socketChannel();

                                SelectionKey key = ch.keyFor(selector);

                                if (key != null)
                                    key.cancel();

                                U.closeQuiet(ch);

                                req.onDone();

                                break;
                            }

                            case REGISTER: {
                                register((NioOperationFuture)req0);

                                break;
                            }

                            case MOVE: {
                                SessionMoveFuture f = (SessionMoveFuture)req0;

                                GridSelectorNioSessionImpl ses = f.session();

                                if (idx == f.toIdx) {
                                    assert f.movedSocketChannel() != null : f;

                                    boolean add = workerSessions.add(ses);

                                    assert add;

                                    ses.finishMoveSession(this);

                                    if (idx % 2 == 0)
                                        readerMoveCnt.incrementAndGet();
                                    else
                                        writerMoveCnt.incrementAndGet();

                                    SelectionKey key = f.movedSocketChannel().register(selector,
                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE,
                                        ses);

                                    ses.key(key);

                                    ses.procWrite.set(true);

                                    f.onDone(true);
                                }
                                else {
                                    assert f.movedSocketChannel() == null : f;

                                    if (workerSessions.remove(ses)) {
                                        ses.startMoveSession(this);

                                        SelectionKey key = ses.key();

                                        assert key.channel() != null : key;

                                        f.movedSocketChannel((SocketChannel)key.channel());

                                        key.cancel();
                                        commitKeyCancellation();

                                        clientWorkers.get(f.toIndex()).offer(f);
                                    }
                                    else
                                        f.onDone(false);
                                }

                                break;
                            }

                            case REQUIRE_WRITE: {
                                registerWrite((GridSelectorNioSessionImpl)req0.session());

                                break;
                            }

                            case CLOSE: {
                                NioOperationFuture req = (NioOperationFuture)req0;

                                if (close(req.session(), null))
                                    req.onDone(true);
                                else
                                    req.onDone(false);

                                break;
                            }

                            case PAUSE_READ: {
                                NioOperationFuture req = (NioOperationFuture)req0;

                                SelectionKey key = req.session().key();

                                if (key.isValid()) {
                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));

                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();

                                    ses.readsPaused(true);

                                    req.onDone(true);
                                }
                                else
                                    req.onDone(false);

                                break;
                            }

                            case RESUME_READ: {
                                NioOperationFuture req = (NioOperationFuture)req0;

                                SelectionKey key = req.session().key();

                                if (key.isValid()) {
                                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);

                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();

                                    ses.readsPaused(false);

                                    req.onDone(true);
                                }
                                else
                                    req.onDone(false);

                                break;
                            }

                            case DUMP_STATS: {
                                NioOperationFuture req = (NioOperationFuture)req0;

                                IgnitePredicate<GridNioSession> p =
                                    req.msg instanceof IgnitePredicate ? (IgnitePredicate<GridNioSession>)req.msg : null;

                                StringBuilder sb = new StringBuilder();

                                try {
                                    dumpStats(sb, p, p != null);
                                }
                                finally {
                                    req.onDone(sb.toString());
                                }
                            }
                        }
                    }

                    for (long i = 0; i < selectorSpins && selector.selectedKeys().isEmpty(); i++) {
                        // We ignore selectNow() returned value and look at selectedKeys() size because we might
                        // call a selectNow() during session migration (to make sure the selector is deregistered
                        // before trying to re-register it again), and in such a case our selectNow() could return 0,
                        // even though the selection set is not empty.
                        selector.selectNow();

                        if (!selector.selectedKeys().isEmpty()) {
                            // Walk through the ready keys collection and process network events.
                            updateHeartbeat();

                            if (selectedKeys == null)
                                processSelectedKeys(selector.selectedKeys());
                            else
                                processSelectedKeysOptimized(selectedKeys.flip());
                        }

                        if (!changeReqs.isEmpty())
                            continue mainLoop;

                        // Just in case we do busy selects.
                        long now = U.currentTimeMillis();

                        if (now - lastIdleCheck > 2000) {
                            lastIdleCheck = now;

                            checkIdle(selector.keys());
                        }

                        if (isCancelled())
                            return;
                    }

                    // Falling to blocking select.
                    select = true;

                    try {
                        if (!changeReqs.isEmpty())
                            continue;

                        blockingSectionBegin();

                        // Wake up every 2 seconds to check if closed.

                        // We ignore select() returned value and look at selectedKeys() size because we might
                        // call a selectNow() during session migration (to make sure the selector is deregistered
                        // before trying to re-register it again), and in such a case our select() could return 0,
                        // even though the selection set is not empty.
                        selector.select(2000);

                        blockingSectionEnd();

                        if (!selector.selectedKeys().isEmpty()) {
                            // Walk through the ready keys collection and process network events.
                            if (selectedKeys == null)
                                processSelectedKeys(selector.selectedKeys());
                            else
                                processSelectedKeysOptimized(selectedKeys.flip());

                            updateHeartbeat();
                        }

                        // select() call above doesn't throw on interruption; checking it here to propagate timely.
                        if (!closed && !isCancelled.get() && Thread.interrupted())
                            throw new InterruptedException();
                    }
                    finally {
                        select = false;
                    }

                    long now = U.currentTimeMillis();

                    if (now - lastIdleCheck > 2000) {
                        lastIdleCheck = now;

                        checkIdle(selector.keys());
                    }
                }
            }
            // Ignore this exception as thread interruption is equal to 'close' call.
            catch (ClosedByInterruptException e) {
                if (log.isDebugEnabled())
                    log.debug("Closing selector due to thread interruption: " + e.getMessage());
            }
            catch (ClosedSelectorException e) {
                throw new IgniteCheckedException("Selector got closed while active.", e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to select events on selector.", e);
            }
            finally {
                if (selector.isOpen()) {
                    if (log.isDebugEnabled())
                        log.debug("Closing all connected client sockets.");

                    // Close all channels registered with selector.
                    for (SelectionKey key : selector.keys()) {
                        GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();

                        if (attach != null && attach.hasSession())
                            close(attach.session(), null);
                    }

                    if (log.isDebugEnabled())
                        log.debug("Closing NIO selector.");

                    U.close(selector, log);
                }
            }
        }