public void run()

in lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java [192:357]


    public void run() {
      long t1 = System.currentTimeMillis();

      int numTotalServers = servers.size();
      stats.setNumTotalServers(numTotalServers);

      // buffer for receiving response from servers
      recvBuf = new ByteBuffer[numTotalServers];
      // buffer for sending request
      ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
      long[] numBytesRead = new long[numTotalServers];
      int[] frameSize = new int[numTotalServers];
      boolean[] hasReadFrameSize = new boolean[numTotalServers];

      try {
        selector = Selector.open();
      } catch (IOException ioe) {
        LOGGER.error("Selector opens error", ioe);
        return;
      }

      for (int i = 0; i < numTotalServers; i++) {
        // create buffer to send request to server.
        sendBuf[i] = requestBuf.duplicate();
        // create buffer to read response's frame size from server
        recvBuf[i] = ByteBuffer.allocate(4);
        stats.incTotalRecvBufBytes(4);

        InetSocketAddress server = servers.get(i);
        SocketChannel s = null;
        SelectionKey key = null;
        try {
          s = SocketChannel.open();
          s.configureBlocking(false);
          // now this method is non-blocking
          s.connect(server);
          key = s.register(selector, s.validOps());
          // attach index of the key
          key.attach(i);
        } catch (Exception e) {
          stats.incNumConnectErrorServers();
          LOGGER.error("Set up socket to server {} error", server, e);

          // free resource
          if (s != null) {
            try {
              s.close();
            } catch (Exception ex) {
              LOGGER.error("failed to free up socket", ex);
            }
          }
          if (key != null) {
            key.cancel();
          }
        }
      }

      // wait for events
      while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers()
          < stats.getNumTotalServers()) {
        // if the thread is interrupted (e.g., task is cancelled)
        if (Thread.currentThread().isInterrupted()) {
          return;
        }

        try {
          selector.select();
        } catch (Exception e) {
          LOGGER.error("Selector selects error", e);
          continue;
        }

        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
        while (it.hasNext()) {
          SelectionKey selKey = it.next();
          it.remove();

          // get previously attached index
          int index = (Integer) selKey.attachment();

          if (selKey.isValid() && selKey.isConnectable()) {
            // if this socket throws an exception (e.g., connection refused),
            // print error msg and skip it.
            try {
              SocketChannel sChannel = (SocketChannel) selKey.channel();
              sChannel.finishConnect();
            } catch (Exception e) {
              stats.incNumConnectErrorServers();
              LOGGER.error("Socket {} connects to server {} error", index, servers.get(index), e);
            }
          }

          if (selKey.isValid() && selKey.isWritable() && sendBuf[index].hasRemaining()) {
            // if this socket throws an exception, print error msg and
            // skip it.
            try {
              SocketChannel sChannel = (SocketChannel) selKey.channel();
              sChannel.write(sendBuf[index]);
            } catch (Exception e) {
              LOGGER.error("Socket {} writes to server {} error", index, servers.get(index), e);
            }
          }

          if (selKey.isValid() && selKey.isReadable()) {
            // if this socket throws an exception, print error msg and
            // skip it.
            try {
              SocketChannel sChannel = (SocketChannel) selKey.channel();
              int bytesRead = sChannel.read(recvBuf[index]);

              if (bytesRead > 0) {
                numBytesRead[index] += bytesRead;

                if (!hasReadFrameSize[index] && recvBuf[index].remaining() == 0) {
                  // if the frame size has been read completely, then prepare
                  // to read the actual frame.
                  frameSize[index] = recvBuf[index].getInt(0);

                  if (frameSize[index] <= 0) {
                    stats.incNumInvalidFrameSize();
                    LOGGER.error(
                        "Read an invalid frame size {} from {}. Does the server use TFramedTransport?",
                        frameSize[index],
                        servers.get(index));
                    sChannel.close();
                    continue;
                  }

                  if (frameSize[index] + 4 > stats.getMaxResponseBytes()) {
                    stats.setMaxResponseBytes(frameSize[index] + 4);
                  }

                  if (frameSize[index] + 4 > maxRecvBufBytesPerServer) {
                    stats.incNumOverflowedRecvBuf();
                    LOGGER.error(
                        "Read frame size {} from {}, total buffer size would exceed limit {}",
                        frameSize[index],
                        servers.get(index),
                        maxRecvBufBytesPerServer);
                    sChannel.close();
                    continue;
                  }

                  // reallocate buffer for actual frame data
                  recvBuf[index] = ByteBuffer.allocate(frameSize[index] + 4);
                  recvBuf[index].putInt(frameSize[index]);

                  stats.incTotalRecvBufBytes(frameSize[index]);
                  hasReadFrameSize[index] = true;
                }

                if (hasReadFrameSize[index] && numBytesRead[index] >= frameSize[index] + 4) {
                  // has read all data
                  sChannel.close();
                  stats.incNumReadCompletedServers();
                  long t2 = System.currentTimeMillis();
                  stats.setReadTime(t2 - t1);
                }
              }
            } catch (Exception e) {
              LOGGER.error("Socket {} reads from server {} error", index, servers.get(index), e);
            }
          }
        }
      }
    }