in hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java [187:328]
public void run() {
try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
BitSet unsentMessagesBitmap = new BitSet();
List<Message> tempUnsentMessages = new ArrayList<Message>();
while (!stopped) {
try {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting Select");
}
int n = selector.select();
collectOutstandingWork();
if (!workingPendingConnections.isEmpty()) {
for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
SelectionKey cKey = null;
if (channel.connect(handle.getRemoteAddress())) {
cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setState(HandleState.CONNECT_SENT);
write(createInitialReqMessage(handle));
} else {
cKey = channel.register(selector, SelectionKey.OP_CONNECT);
}
handle.setKey(cKey);
cKey.attach(handle);
}
workingPendingConnections.clear();
}
if (!workingSendList.isEmpty()) {
unsentMessagesBitmap.clear();
int len = workingSendList.size();
for (int i = 0; i < len; ++i) {
Message msg = workingSendList.get(i);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Processing send of message: " + msg);
}
IPCHandle handle = msg.getIPCHandle();
if (handle.getState() != HandleState.CLOSED) {
if (!handle.full()) {
while (true) {
ByteBuffer buffer = handle.getOutBuffer();
buffer.compact();
boolean success = msg.write(buffer);
buffer.flip();
if (success) {
system.getPerformanceCounters().addMessageSentCount(1);
SelectionKey key = handle.getKey();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
if (!buffer.hasRemaining()) {
handle.resizeOutBuffer();
continue;
}
handle.markFull();
unsentMessagesBitmap.set(i);
}
break;
}
} else {
unsentMessagesBitmap.set(i);
}
}
}
copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
}
if (n > 0) {
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
SelectableChannel sc = key.channel();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) sc;
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer readBuffer = handle.getInBuffer();
int len = channel.read(readBuffer);
system.getPerformanceCounters().addMessageBytesReceived(len);
if (len < 0) {
key.cancel();
channel.close();
handle.close();
} else {
handle.processIncomingMessages();
if (!readBuffer.hasRemaining()) {
handle.resizeInBuffer();
}
}
} else if (key.isWritable()) {
SocketChannel channel = (SocketChannel) sc;
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer writeBuffer = handle.getOutBuffer();
int len = channel.write(writeBuffer);
system.getPerformanceCounters().addMessageBytesSent(len);
if (len < 0) {
key.cancel();
channel.close();
handle.close();
} else if (!writeBuffer.hasRemaining()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
if (handle.full()) {
handle.clearFull();
selector.wakeup();
}
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
IPCHandle handle = new IPCHandle(system, null);
SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setKey(cKey);
cKey.attach(handle);
handle.setState(HandleState.CONNECT_RECEIVED);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
IPCHandle handle = (IPCHandle) key.attachment();
try {
if (!channel.finishConnect()) {
throw new Exception("Connection did not finish");
}
} catch (Exception e) {
e.printStackTrace();
handle.setState(HandleState.CONNECT_FAILED);
continue;
}
handle.setState(HandleState.CONNECT_SENT);
registerHandle(handle);
key.interestOps(SelectionKey.OP_READ);
write(createInitialReqMessage(handle));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}