in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [1341:1483]
private void close(String reason, boolean cleanupEndpoint, boolean p_removeEndpoint,
boolean beingSick, boolean forceRemoval) {
// use getAndSet outside sync on this
boolean onlyCleanup = closing.getAndSet(true);
if (onlyCleanup && !forceRemoval) {
return;
}
boolean removeEndpoint = p_removeEndpoint;
if (!onlyCleanup) {
synchronized (this) {
stopped = true;
if (connected) {
if (asyncQueuingInProgress && pusherThread != Thread.currentThread()) {
// We don't need to do this if we are the pusher thread
// and we have determined that we need to close the connection.
synchronized (outgoingQueue) {
// wait for the flusher to complete (it may timeout)
while (asyncQueuingInProgress) {
boolean interrupted = Thread.interrupted();
try {
outgoingQueue.wait(); // spurious wakeup ok
} catch (InterruptedException ie) {
interrupted = true;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
}
connected = false;
final DMStats stats = owner.getConduit().getStats();
if (finishedConnecting) {
if (isReceiver) {
stats.decReceivers();
} else {
stats.decSenders(sharedResource, preserveOrder);
}
}
} else if (!forceRemoval) {
removeEndpoint = false;
}
// make sure our socket is closed
asyncClose(false);
if (!isReceiver && !hasResidualReaderThread()) {
// receivers release the input buffer when exiting run(). Senders use the
// inputBuffer for reading direct-reply responses
inputBufferVendor.destruct();
}
lengthSet = false;
}
// Make sure anyone waiting for a handshake stops waiting
notifyHandshakeWaiter(false);
// wait a bit for the our reader thread to exit don't wait if we are the reader thread
boolean isIBM = false;
// if network partition detection is enabled or this is an admin vm
// we can't wait for the reader thread when running in an IBM JRE
if (conduit.getConfig().getEnableNetworkPartitionDetection()
|| conduit.getMemberId().getVmKind() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
|| conduit.getMemberId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
}
// Now that readerThread is returned to a pool after we close
// we need to be more careful not to join on a thread that belongs
// to someone else.
Thread readerThreadSnapshot = readerThread;
if (!beingSick && readerThreadSnapshot != null && !isIBM && isRunning
&& !readerShuttingDown && readerThreadSnapshot != Thread.currentThread()) {
try {
readerThreadSnapshot.join(500);
readerThreadSnapshot = readerThread;
if (isRunning && !readerShuttingDown && readerThreadSnapshot != null
&& owner.getDM().getRootCause() == null) {
// don't wait twice if there's a system failure
readerThreadSnapshot.join(1500);
if (isRunning) {
logger.info("Timed out waiting for readerThread on {} to finish.",
this);
}
}
} catch (IllegalThreadStateException ignore) {
// ignored - thread already stopped
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
// but keep going, we're trying to close.
}
}
closeBatchBuffer();
closeAllMsgDestreamers();
}
if (cleanupEndpoint) {
if (isReceiver) {
owner.removeReceiver(this);
}
if (removeEndpoint) {
if (sharedResource) {
if (!preserveOrder) {
// only remove endpoint when shared unordered connection is closed
if (!isReceiver) {
// Only remove endpoint if sender.
if (finishedConnecting) {
// only remove endpoint if our constructor finished
owner.removeEndpoint(remoteMember, reason);
}
}
} else {
// noinspection ConstantConditions
owner.removeSharedConnection(reason, remoteMember, preserveOrder, this);
}
} else if (!isReceiver) {
owner.removeThreadConnection(remoteMember, this);
}
} else {
// This code is ok to do even if the ConnectionTable has never added this Connection to its
// maps since the calls in this block use our identity to do the removes.
if (sharedResource) {
owner.removeSharedConnection(reason, remoteMember, preserveOrder, this);
} else if (!isReceiver) {
owner.removeThreadConnection(remoteMember, this);
}
}
}
// This cancels the idle timer task, but it also removes the tasks reference to this connection,
// freeing up the connection (and it's buffers for GC sooner.
if (idleTask != null) {
synchronized (idleTask) {
idleTask.cancel();
}
}
if (ackTimeoutTask != null) {
synchronized (ackTimeoutTask) {
ackTimeoutTask.cancel();
}
}
}