in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [1564:1756]
private void readMessages() {
// take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
channel = getSocket().getChannel();
socket.setSoTimeout(0);
socket.setTcpNoDelay(true);
if (ioFilter == null) {
createIoFilter(channel);
}
channel.configureBlocking(true);
} catch (ClosedChannelException e) {
// the channel was asynchronously closed. Our work is done.
try {
requestClose("readMessages caught closed channel");
} catch (Exception ignore) {
}
// exit loop and thread
return;
} catch (IOException ex) {
if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
try {
requestClose("readMessages caught shutdown");
} catch (Exception ignore) {
}
// exit loop (and thread)
return;
}
logger.info("Failed initializing socket for message {}: {}",
isReceiver ? "receiver" : "sender", ex.getMessage());
readerShuttingDown = true;
try {
requestClose(format("Failed initializing socket %s", ex));
} catch (Exception ignore) {
}
return;
}
if (!stopped) {
if (logger.isDebugEnabled()) {
logger.debug("Starting {} on {}", p2pReaderName(), socket);
}
}
// we should not change the state of the connection if we are a handshake reader thread
// as there is a race between this thread and the application thread doing direct ack
boolean handshakeHasBeenRead = false;
final ThreadsMonitoring threadMonitoring = getThreadMonitoring();
final AbstractExecutor threadMonitorExecutor =
threadMonitoring.createAbstractExecutor(P2PReaderExecutor);
threadMonitorExecutor.suspendMonitoring();
threadMonitoring.register(threadMonitorExecutor);
try {
for (boolean isInitialRead = true;;) {
if (stopped) {
break;
}
if (SystemFailure.getFailure() != null) {
// Allocate no objects here!
try {
ioFilter.close(socket.getChannel());
socket.close();
} catch (IOException e) {
// don't care
}
getCheckFailure();
}
if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
break;
}
try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
ByteBuffer buff = inputSharing.getBuffer();
synchronized (stateLock) {
connectionState = STATE_READING;
}
int amountRead;
if (!isInitialRead) {
amountRead = channel.read(buff);
} else {
isInitialRead = false;
// if we're using SSL/TLS the input buffer may already have data to process
final boolean skipInitialRead = buff.position() > 0;
if (!skipInitialRead) {
amountRead = channel.read(buff);
} else {
amountRead = buff.position();
}
}
synchronized (stateLock) {
connectionState = STATE_IDLE;
}
if (amountRead == 0) {
continue;
}
if (amountRead < 0) {
readerShuttingDown = true;
try {
requestClose("SocketChannel.read returned EOF");
} catch (Exception e) {
// ignore - shutting down
}
return;
}
processInputBuffer(threadMonitorExecutor);
if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
if (logger.isDebugEnabled()) {
if (handshakeRead) {
logger.debug("handshake has been read {}", this);
} else {
logger.debug("handshake has been cancelled {}", this);
}
}
handshakeHasBeenRead = true;
// Once we have read the handshake for unshared connections, the reader can skip
// processing messages
if (!sharedResource || asyncMode) {
break;
} else {
// not exiting and not a Reader spawned from a ServerSocket.accept(), so
// let's set some state noting that this is happening
hasResidualReaderThread = true;
}
}
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
}
readerShuttingDown = true;
try {
requestClose(format("CacheClosed in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (ClosedChannelException e) {
readerShuttingDown = true;
try {
requestClose(format("ClosedChannelException in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (IOException e) {
// "Socket closed" check needed for Solaris jdk 1.4.2_08
if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
logger.info("{} io exception for {}", p2pReaderName(), this, e);
}
if (logger.isDebugEnabled()) {
if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
logger.debug(
"{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
p2pReaderName());
}
}
}
readerShuttingDown = true;
try {
requestClose(format("IOException in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (Exception e) {
owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
if (!stopped && !isSocketClosed()) {
logger.fatal(format("%s exception in channel read", p2pReaderName()), e);
}
readerShuttingDown = true;
try {
requestClose(format("%s exception in channel read", e));
} catch (Exception ignored) {
}
return;
}
}
} finally {
threadMonitoring.unregister(threadMonitorExecutor);
hasResidualReaderThread = false;
if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
synchronized (stateLock) {
connectionState = STATE_IDLE;
}
}
if (logger.isDebugEnabled()) {
logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
remoteMember, handshakeHasBeenRead);
}
}
}