void readMessage()

in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [3053:3228]


  void readMessage(ByteBuffer peerDataBuffer, AbstractExecutor threadMonitorExecutor) {
    if (messageType == NORMAL_MSG_TYPE) {
      owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
      try (ByteBufferInputStream bbis =
          remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
        ReplyProcessor21.initMessageRPId();
        // add serialization stats
        long startSer = owner.getConduit().getStats().startMsgDeserialization();
        int startingPosition = peerDataBuffer.position();
        DistributionMessage msg;
        try {
          msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
        } catch (SerializationException e) {
          logger.info("input buffer starting position {} "
              + " current position {} limit {} capacity {} message length {}",
              startingPosition, peerDataBuffer.position(), peerDataBuffer.limit(),
              peerDataBuffer.capacity(), messageLength);
          throw e;
        }
        owner.getConduit().getStats().endMsgDeserialization(startSer);
        if (bbis.available() != 0) {
          logger.warn("Message deserialization of {} did not read {} bytes.", msg,
              bbis.available());
        }
        try {
          if (!dispatchMessage(msg, messageLength, directAck, threadMonitorExecutor)) {
            directAck = false;
          }
        } catch (MemberShunnedException e) {
          directAck = false; // don't respond
        } catch (Exception de) {
          owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
          logger.fatal("Error dispatching message", de);
        } catch (ThreadDeath td) {
          throw td;
        } catch (VirtualMachineError err) {
          initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable t) {
          // Whenever you catch Error or Throwable, you must also
          // catch VirtualMachineError (see above). However, there is
          // _still_ a possibility that you are dealing with a cascading
          // error condition, so you also need to check to see if the JVM
          // is still usable:
          getCheckFailure();
          logger.fatal("Throwable dispatching message", t);
        }
      } catch (VirtualMachineError err) {
        initiateFailure(err);
        // If this ever returns, rethrow the error. We're poisoned
        // now, so don't let this thread continue.
        throw err;
      } catch (Throwable t) {
        logger.fatal("Error deserializing message", t);
        // Whenever you catch Error or Throwable, you must also
        // catch VirtualMachineError (see above). However, there is
        // _still_ a possibility that you are dealing with a cascading
        // error condition, so you also need to check to see if the JVM
        // is still usable:
        getCheckFailure();
        sendFailureReply(ReplyProcessor21.getMessageRPId(), "Error deserializing message", t,
            directAck);
        if (t instanceof ThreadDeath) {
          throw (ThreadDeath) t;
        }
        if (t instanceof CancelException) {
          if (!(t instanceof CacheClosedException)) {
            // Just log a message if we had trouble deserializing due to
            // CacheClosedException; see bug 43543
            throw (CancelException) t;
          }
        }
      } finally {
        ReplyProcessor21.clearMessageRPId();
      }
    } else if (messageType == CHUNKED_MSG_TYPE) {
      MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
      owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
          messageLength);
      try {
        md.addChunk(peerDataBuffer, messageLength);
      } catch (IOException ex) {
        // ignored
      }
    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
      MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
      owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
          messageLength);
      try {
        md.addChunk(peerDataBuffer, messageLength);
      } catch (IOException ex) {
        logger.fatal("Failed handling end chunk message", ex);
      }
      DistributionMessage msg = null;
      int msgLength;
      String failureMsg = null;
      Throwable failureEx = null;
      int rpId = 0;
      boolean interrupted = false;
      try {
        msg = md.getMessage();
      } catch (ClassNotFoundException ex) {
        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
        failureMsg = "ClassNotFound deserializing message";
        failureEx = ex;
        rpId = md.getRPid();
        logAtInfoAndFatal(failureMsg, failureEx);
      } catch (IOException ex) {
        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
        failureMsg = "IOException deserializing message";
        failureEx = ex;
        rpId = md.getRPid();
        logAtInfoAndFatal(failureMsg, failureEx);
      } catch (InterruptedException ex) {
        interrupted = true;
        owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
      } catch (VirtualMachineError err) {
        initiateFailure(err);
        // If this ever returns, rethrow the error. We're poisoned
        // now, so don't let this thread continue.
        throw err;
      } catch (Throwable ex) {
        // Whenever you catch Error or Throwable, you must also
        // catch VirtualMachineError (see above). However, there is
        // _still_ a possibility that you are dealing with a cascading
        // error condition, so you also need to check to see if the JVM
        // is still usable:
        getCheckFailure();
        owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
        owner.getConduit().getStats().decMessagesBeingReceived(md.size());
        failureMsg = "Unexpected failure deserializing message";
        failureEx = ex;
        rpId = md.getRPid();
        logAtInfoAndFatal(failureMsg, failureEx);
      } finally {
        msgLength = md.size();
        releaseMsgDestreamer(messageId, md);
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
      if (msg != null) {
        try {
          if (!dispatchMessage(msg, msgLength, directAck, threadMonitorExecutor)) {
            directAck = false;
          }
        } catch (MemberShunnedException e) {
          // not a member anymore - don't reply
          directAck = false;
        } catch (Exception de) {
          owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
          logger.fatal("Error dispatching message", de);
        } catch (ThreadDeath td) {
          throw td;
        } catch (VirtualMachineError err) {
          initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable t) {
          // Whenever you catch Error or Throwable, you must also
          // catch VirtualMachineError (see above). However, there is
          // _still_ a possibility that you are dealing with a cascading
          // error condition, so you also need to check to see if the JVM
          // is still usable:
          getCheckFailure();
          logger.fatal("Throwable dispatching message", t);
        }
      } else if (failureEx != null) {
        sendFailureReply(rpId, failureMsg, failureEx, directAck);
      }
    }
  }