in hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java [118:208]
public void run() {
while (true) {
try {
int n = selector.select();
collectOutstandingWork();
if (!workingPendingConnections.isEmpty()) {
for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
boolean connect = false;
boolean failure = false;
try {
connect = channel.connect(address);
} catch (IOException e) {
failure = true;
synchronized (connectionListener) {
connectionListener.connectionFailure(address);
}
}
if (!failure) {
if (!connect) {
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
key.attach(address);
} else {
SelectionKey key = channel.register(selector, 0);
createConnection(key, channel);
}
}
}
workingPendingConnections.clear();
}
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel : workingIncomingConnections) {
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
sKey.attach(connection);
synchronized (connectionListener) {
connectionListener.acceptedConnection(connection);
}
}
workingIncomingConnections.clear();
}
if (n > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
SelectableChannel sc = key.channel();
boolean readable = key.isReadable();
boolean writable = key.isWritable();
if (readable || writable) {
TCPConnection connection = (TCPConnection) key.attachment();
try {
connection.getEventListener().notifyIOReady(connection, readable, writable);
} catch (Exception e) {
connection.getEventListener().notifyIOError(e);
connection.close();
continue;
}
}
if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
distributeIncomingConnection(channel);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
boolean finishConnect = false;
try {
finishConnect = channel.finishConnect();
} catch (Exception e) {
e.printStackTrace();
key.cancel();
synchronized (connectionListener) {
connectionListener.connectionFailure((InetSocketAddress) key.attachment());
}
}
if (finishConnect) {
createConnection(key, channel);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}