private void readMessages()

in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [1564:1756]


  private void readMessages() {
    // take a snapshot of uniqueId to detect reconnect attempts
    SocketChannel channel;
    try {
      channel = getSocket().getChannel();
      socket.setSoTimeout(0);
      socket.setTcpNoDelay(true);
      if (ioFilter == null) {
        createIoFilter(channel);
      }
      channel.configureBlocking(true);
    } catch (ClosedChannelException e) {
      // the channel was asynchronously closed. Our work is done.
      try {
        requestClose("readMessages caught closed channel");
      } catch (Exception ignore) {
      }
      // exit loop and thread
      return;
    } catch (IOException ex) {
      if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
        try {
          requestClose("readMessages caught shutdown");
        } catch (Exception ignore) {
        }
        // exit loop (and thread)
        return;
      }
      logger.info("Failed initializing socket for message {}: {}",
          isReceiver ? "receiver" : "sender", ex.getMessage());
      readerShuttingDown = true;
      try {
        requestClose(format("Failed initializing socket %s", ex));
      } catch (Exception ignore) {
      }
      return;
    }

    if (!stopped) {
      if (logger.isDebugEnabled()) {
        logger.debug("Starting {} on {}", p2pReaderName(), socket);
      }
    }

    // we should not change the state of the connection if we are a handshake reader thread
    // as there is a race between this thread and the application thread doing direct ack
    boolean handshakeHasBeenRead = false;
    final ThreadsMonitoring threadMonitoring = getThreadMonitoring();
    final AbstractExecutor threadMonitorExecutor =
        threadMonitoring.createAbstractExecutor(P2PReaderExecutor);
    threadMonitorExecutor.suspendMonitoring();
    threadMonitoring.register(threadMonitorExecutor);
    try {
      for (boolean isInitialRead = true;;) {
        if (stopped) {
          break;
        }
        if (SystemFailure.getFailure() != null) {
          // Allocate no objects here!
          try {
            ioFilter.close(socket.getChannel());
            socket.close();
          } catch (IOException e) {
            // don't care
          }
          getCheckFailure();
        }
        if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
          break;
        }

        try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
          ByteBuffer buff = inputSharing.getBuffer();

          synchronized (stateLock) {
            connectionState = STATE_READING;
          }
          int amountRead;
          if (!isInitialRead) {
            amountRead = channel.read(buff);
          } else {
            isInitialRead = false;
            // if we're using SSL/TLS the input buffer may already have data to process
            final boolean skipInitialRead = buff.position() > 0;
            if (!skipInitialRead) {
              amountRead = channel.read(buff);
            } else {
              amountRead = buff.position();
            }
          }
          synchronized (stateLock) {
            connectionState = STATE_IDLE;
          }
          if (amountRead == 0) {
            continue;
          }
          if (amountRead < 0) {
            readerShuttingDown = true;
            try {
              requestClose("SocketChannel.read returned EOF");
            } catch (Exception e) {
              // ignore - shutting down
            }
            return;
          }
          processInputBuffer(threadMonitorExecutor);

          if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
            if (logger.isDebugEnabled()) {
              if (handshakeRead) {
                logger.debug("handshake has been read {}", this);
              } else {
                logger.debug("handshake has been cancelled {}", this);
              }
            }
            handshakeHasBeenRead = true;

            // Once we have read the handshake for unshared connections, the reader can skip
            // processing messages
            if (!sharedResource || asyncMode) {
              break;
            } else {
              // not exiting and not a Reader spawned from a ServerSocket.accept(), so
              // let's set some state noting that this is happening
              hasResidualReaderThread = true;
            }

          }
        } catch (CancelException e) {
          if (logger.isDebugEnabled()) {
            logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
          }
          readerShuttingDown = true;
          try {
            requestClose(format("CacheClosed in channel read: %s", e));
          } catch (Exception ignored) {
          }
          return;
        } catch (ClosedChannelException e) {
          readerShuttingDown = true;
          try {
            requestClose(format("ClosedChannelException in channel read: %s", e));
          } catch (Exception ignored) {
          }
          return;
        } catch (IOException e) {
          // "Socket closed" check needed for Solaris jdk 1.4.2_08
          if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
              logger.info("{} io exception for {}", p2pReaderName(), this, e);
            }
            if (logger.isDebugEnabled()) {
              if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
                logger.debug(
                    "{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
                    p2pReaderName());
              }
            }
          }
          readerShuttingDown = true;
          try {
            requestClose(format("IOException in channel read: %s", e));
          } catch (Exception ignored) {
          }
          return;

        } catch (Exception e) {
          owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
          if (!stopped && !isSocketClosed()) {
            logger.fatal(format("%s exception in channel read", p2pReaderName()), e);
          }
          readerShuttingDown = true;
          try {
            requestClose(format("%s exception in channel read", e));
          } catch (Exception ignored) {
          }
          return;
        }
      }
    } finally {
      threadMonitoring.unregister(threadMonitorExecutor);
      hasResidualReaderThread = false;
      if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
        synchronized (stateLock) {
          connectionState = STATE_IDLE;
        }
      }
      if (logger.isDebugEnabled()) {
        logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
            remoteMember, handshakeHasBeenRead);
      }
    }
  }