in lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java [192:357]
public void run() {
long t1 = System.currentTimeMillis();
int numTotalServers = servers.size();
stats.setNumTotalServers(numTotalServers);
// buffer for receiving response from servers
recvBuf = new ByteBuffer[numTotalServers];
// buffer for sending request
ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
long[] numBytesRead = new long[numTotalServers];
int[] frameSize = new int[numTotalServers];
boolean[] hasReadFrameSize = new boolean[numTotalServers];
try {
selector = Selector.open();
} catch (IOException ioe) {
LOGGER.error("Selector opens error", ioe);
return;
}
for (int i = 0; i < numTotalServers; i++) {
// create buffer to send request to server.
sendBuf[i] = requestBuf.duplicate();
// create buffer to read response's frame size from server
recvBuf[i] = ByteBuffer.allocate(4);
stats.incTotalRecvBufBytes(4);
InetSocketAddress server = servers.get(i);
SocketChannel s = null;
SelectionKey key = null;
try {
s = SocketChannel.open();
s.configureBlocking(false);
// now this method is non-blocking
s.connect(server);
key = s.register(selector, s.validOps());
// attach index of the key
key.attach(i);
} catch (Exception e) {
stats.incNumConnectErrorServers();
LOGGER.error("Set up socket to server {} error", server, e);
// free resource
if (s != null) {
try {
s.close();
} catch (Exception ex) {
LOGGER.error("failed to free up socket", ex);
}
}
if (key != null) {
key.cancel();
}
}
}
// wait for events
while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers()
< stats.getNumTotalServers()) {
// if the thread is interrupted (e.g., task is cancelled)
if (Thread.currentThread().isInterrupted()) {
return;
}
try {
selector.select();
} catch (Exception e) {
LOGGER.error("Selector selects error", e);
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selKey = it.next();
it.remove();
// get previously attached index
int index = (Integer) selKey.attachment();
if (selKey.isValid() && selKey.isConnectable()) {
// if this socket throws an exception (e.g., connection refused),
// print error msg and skip it.
try {
SocketChannel sChannel = (SocketChannel) selKey.channel();
sChannel.finishConnect();
} catch (Exception e) {
stats.incNumConnectErrorServers();
LOGGER.error("Socket {} connects to server {} error", index, servers.get(index), e);
}
}
if (selKey.isValid() && selKey.isWritable() && sendBuf[index].hasRemaining()) {
// if this socket throws an exception, print error msg and
// skip it.
try {
SocketChannel sChannel = (SocketChannel) selKey.channel();
sChannel.write(sendBuf[index]);
} catch (Exception e) {
LOGGER.error("Socket {} writes to server {} error", index, servers.get(index), e);
}
}
if (selKey.isValid() && selKey.isReadable()) {
// if this socket throws an exception, print error msg and
// skip it.
try {
SocketChannel sChannel = (SocketChannel) selKey.channel();
int bytesRead = sChannel.read(recvBuf[index]);
if (bytesRead > 0) {
numBytesRead[index] += bytesRead;
if (!hasReadFrameSize[index] && recvBuf[index].remaining() == 0) {
// if the frame size has been read completely, then prepare
// to read the actual frame.
frameSize[index] = recvBuf[index].getInt(0);
if (frameSize[index] <= 0) {
stats.incNumInvalidFrameSize();
LOGGER.error(
"Read an invalid frame size {} from {}. Does the server use TFramedTransport?",
frameSize[index],
servers.get(index));
sChannel.close();
continue;
}
if (frameSize[index] + 4 > stats.getMaxResponseBytes()) {
stats.setMaxResponseBytes(frameSize[index] + 4);
}
if (frameSize[index] + 4 > maxRecvBufBytesPerServer) {
stats.incNumOverflowedRecvBuf();
LOGGER.error(
"Read frame size {} from {}, total buffer size would exceed limit {}",
frameSize[index],
servers.get(index),
maxRecvBufBytesPerServer);
sChannel.close();
continue;
}
// reallocate buffer for actual frame data
recvBuf[index] = ByteBuffer.allocate(frameSize[index] + 4);
recvBuf[index].putInt(frameSize[index]);
stats.incTotalRecvBufBytes(frameSize[index]);
hasReadFrameSize[index] = true;
}
if (hasReadFrameSize[index] && numBytesRead[index] >= frameSize[index] + 4) {
// has read all data
sChannel.close();
stats.incNumReadCompletedServers();
long t2 = System.currentTimeMillis();
stats.setReadTime(t2 - t1);
}
}
} catch (Exception e) {
LOGGER.error("Socket {} reads from server {} error", index, servers.get(index), e);
}
}
}
}
}