in modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java [2042:2331]
private void bodyInternal() throws IgniteCheckedException, InterruptedException {
try {
long lastIdleCheck = U.currentTimeMillis();
mainLoop:
while (!closed && selector.isOpen()) {
SessionChangeRequest req0;
updateHeartbeat();
while ((req0 = changeReqs.poll()) != null) {
updateHeartbeat();
switch (req0.operation()) {
case CONNECT: {
NioOperationFuture fut = (NioOperationFuture)req0;
SocketChannel ch = fut.socketChannel();
try {
ch.register(selector, SelectionKey.OP_CONNECT, fut);
}
catch (IOException e) {
fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
}
break;
}
case CANCEL_CONNECT: {
NioOperationFuture req = (NioOperationFuture)req0;
SocketChannel ch = req.socketChannel();
SelectionKey key = ch.keyFor(selector);
if (key != null)
key.cancel();
U.closeQuiet(ch);
req.onDone();
break;
}
case REGISTER: {
register((NioOperationFuture)req0);
break;
}
case MOVE: {
SessionMoveFuture f = (SessionMoveFuture)req0;
GridSelectorNioSessionImpl ses = f.session();
if (idx == f.toIdx) {
assert f.movedSocketChannel() != null : f;
boolean add = workerSessions.add(ses);
assert add;
ses.finishMoveSession(this);
if (idx % 2 == 0)
readerMoveCnt.incrementAndGet();
else
writerMoveCnt.incrementAndGet();
SelectionKey key = f.movedSocketChannel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
ses);
ses.key(key);
ses.procWrite.set(true);
f.onDone(true);
}
else {
assert f.movedSocketChannel() == null : f;
if (workerSessions.remove(ses)) {
ses.startMoveSession(this);
SelectionKey key = ses.key();
assert key.channel() != null : key;
f.movedSocketChannel((SocketChannel)key.channel());
key.cancel();
commitKeyCancellation();
clientWorkers.get(f.toIndex()).offer(f);
}
else
f.onDone(false);
}
break;
}
case REQUIRE_WRITE: {
registerWrite((GridSelectorNioSessionImpl)req0.session());
break;
}
case CLOSE: {
NioOperationFuture req = (NioOperationFuture)req0;
if (close(req.session(), null))
req.onDone(true);
else
req.onDone(false);
break;
}
case PAUSE_READ: {
NioOperationFuture req = (NioOperationFuture)req0;
SelectionKey key = req.session().key();
if (key.isValid()) {
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
ses.readsPaused(true);
req.onDone(true);
}
else
req.onDone(false);
break;
}
case RESUME_READ: {
NioOperationFuture req = (NioOperationFuture)req0;
SelectionKey key = req.session().key();
if (key.isValid()) {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
ses.readsPaused(false);
req.onDone(true);
}
else
req.onDone(false);
break;
}
case DUMP_STATS: {
NioOperationFuture req = (NioOperationFuture)req0;
IgnitePredicate<GridNioSession> p =
req.msg instanceof IgnitePredicate ? (IgnitePredicate<GridNioSession>)req.msg : null;
StringBuilder sb = new StringBuilder();
try {
dumpStats(sb, p, p != null);
}
finally {
req.onDone(sb.toString());
}
}
}
}
for (long i = 0; i < selectorSpins && selector.selectedKeys().isEmpty(); i++) {
// We ignore selectNow() returned value and look at selectedKeys() size because we might
// call a selectNow() during session migration (to make sure the selector is deregistered
// before trying to re-register it again), and in such a case our selectNow() could return 0,
// even though the selection set is not empty.
selector.selectNow();
if (!selector.selectedKeys().isEmpty()) {
// Walk through the ready keys collection and process network events.
updateHeartbeat();
if (selectedKeys == null)
processSelectedKeys(selector.selectedKeys());
else
processSelectedKeysOptimized(selectedKeys.flip());
}
if (!changeReqs.isEmpty())
continue mainLoop;
// Just in case we do busy selects.
long now = U.currentTimeMillis();
if (now - lastIdleCheck > 2000) {
lastIdleCheck = now;
checkIdle(selector.keys());
}
if (isCancelled())
return;
}
// Falling to blocking select.
select = true;
try {
if (!changeReqs.isEmpty())
continue;
blockingSectionBegin();
// Wake up every 2 seconds to check if closed.
// We ignore select() returned value and look at selectedKeys() size because we might
// call a selectNow() during session migration (to make sure the selector is deregistered
// before trying to re-register it again), and in such a case our select() could return 0,
// even though the selection set is not empty.
selector.select(2000);
blockingSectionEnd();
if (!selector.selectedKeys().isEmpty()) {
// Walk through the ready keys collection and process network events.
if (selectedKeys == null)
processSelectedKeys(selector.selectedKeys());
else
processSelectedKeysOptimized(selectedKeys.flip());
updateHeartbeat();
}
// select() call above doesn't throw on interruption; checking it here to propagate timely.
if (!closed && !isCancelled.get() && Thread.interrupted())
throw new InterruptedException();
}
finally {
select = false;
}
long now = U.currentTimeMillis();
if (now - lastIdleCheck > 2000) {
lastIdleCheck = now;
checkIdle(selector.keys());
}
}
}
// Ignore this exception as thread interruption is equal to 'close' call.
catch (ClosedByInterruptException e) {
if (log.isDebugEnabled())
log.debug("Closing selector due to thread interruption: " + e.getMessage());
}
catch (ClosedSelectorException e) {
throw new IgniteCheckedException("Selector got closed while active.", e);
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to select events on selector.", e);
}
finally {
if (selector.isOpen()) {
if (log.isDebugEnabled())
log.debug("Closing all connected client sockets.");
// Close all channels registered with selector.
for (SelectionKey key : selector.keys()) {
GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
if (attach != null && attach.hasSession())
close(attach.session(), null);
}
if (log.isDebugEnabled())
log.debug("Closing NIO selector.");
U.close(selector, log);
}
}
}