in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [3053:3228]
void readMessage(ByteBuffer peerDataBuffer, AbstractExecutor threadMonitorExecutor) {
if (messageType == NORMAL_MSG_TYPE) {
owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
try (ByteBufferInputStream bbis =
remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
: new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
ReplyProcessor21.initMessageRPId();
// add serialization stats
long startSer = owner.getConduit().getStats().startMsgDeserialization();
int startingPosition = peerDataBuffer.position();
DistributionMessage msg;
try {
msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
} catch (SerializationException e) {
logger.info("input buffer starting position {} "
+ " current position {} limit {} capacity {} message length {}",
startingPosition, peerDataBuffer.position(), peerDataBuffer.limit(),
peerDataBuffer.capacity(), messageLength);
throw e;
}
owner.getConduit().getStats().endMsgDeserialization(startSer);
if (bbis.available() != 0) {
logger.warn("Message deserialization of {} did not read {} bytes.", msg,
bbis.available());
}
try {
if (!dispatchMessage(msg, messageLength, directAck, threadMonitorExecutor)) {
directAck = false;
}
} catch (MemberShunnedException e) {
directAck = false; // don't respond
} catch (Exception de) {
owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
logger.fatal("Error dispatching message", de);
} catch (ThreadDeath td) {
throw td;
} catch (VirtualMachineError err) {
initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
getCheckFailure();
logger.fatal("Throwable dispatching message", t);
}
} catch (VirtualMachineError err) {
initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
logger.fatal("Error deserializing message", t);
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
getCheckFailure();
sendFailureReply(ReplyProcessor21.getMessageRPId(), "Error deserializing message", t,
directAck);
if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
}
if (t instanceof CancelException) {
if (!(t instanceof CacheClosedException)) {
// Just log a message if we had trouble deserializing due to
// CacheClosedException; see bug 43543
throw (CancelException) t;
}
}
} finally {
ReplyProcessor21.clearMessageRPId();
}
} else if (messageType == CHUNKED_MSG_TYPE) {
MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
messageLength);
try {
md.addChunk(peerDataBuffer, messageLength);
} catch (IOException ex) {
// ignored
}
} else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
messageLength);
try {
md.addChunk(peerDataBuffer, messageLength);
} catch (IOException ex) {
logger.fatal("Failed handling end chunk message", ex);
}
DistributionMessage msg = null;
int msgLength;
String failureMsg = null;
Throwable failureEx = null;
int rpId = 0;
boolean interrupted = false;
try {
msg = md.getMessage();
} catch (ClassNotFoundException ex) {
owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "ClassNotFound deserializing message";
failureEx = ex;
rpId = md.getRPid();
logAtInfoAndFatal(failureMsg, failureEx);
} catch (IOException ex) {
owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "IOException deserializing message";
failureEx = ex;
rpId = md.getRPid();
logAtInfoAndFatal(failureMsg, failureEx);
} catch (InterruptedException ex) {
interrupted = true;
owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
} catch (VirtualMachineError err) {
initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable ex) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
getCheckFailure();
owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "Unexpected failure deserializing message";
failureEx = ex;
rpId = md.getRPid();
logAtInfoAndFatal(failureMsg, failureEx);
} finally {
msgLength = md.size();
releaseMsgDestreamer(messageId, md);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if (msg != null) {
try {
if (!dispatchMessage(msg, msgLength, directAck, threadMonitorExecutor)) {
directAck = false;
}
} catch (MemberShunnedException e) {
// not a member anymore - don't reply
directAck = false;
} catch (Exception de) {
owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
logger.fatal("Error dispatching message", de);
} catch (ThreadDeath td) {
throw td;
} catch (VirtualMachineError err) {
initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
getCheckFailure();
logger.fatal("Throwable dispatching message", t);
}
} else if (failureEx != null) {
sendFailureReply(rpId, failureMsg, failureEx, directAck);
}
}
}