in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [5397:5595]
private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
assert msg != null;
UUID locNodeId = getLocalNodeId();
UUID leavingNodeId = msg.creatorNodeId();
msg.spanContainer().span()
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> leavingNodeId.toString());
if (locNodeId.equals(leavingNodeId)) {
if (msg.senderNodeId() == null) {
synchronized (mux) {
if (log.isDebugEnabled())
log.debug("Starting local node stop procedure.");
spiState = STOPPING;
mux.notifyAll();
}
}
if (msg.verified() || !ring.hasRemoteNodes() || msg.senderNodeId() != null) {
if (spi.ipFinder.isShared() && !ring.hasRemoteNodes()) {
try {
spi.ipFinder.unregisterAddresses(
U.resolveAddresses(spi.getAddressResolver(), locNode.socketAddresses()));
}
catch (IgniteSpiException e) {
U.error(log, "Failed to unregister local node address from IP finder.", e);
}
}
synchronized (mux) {
if (spiState == STOPPING) {
spiState = LEFT;
mux.notifyAll();
}
}
return;
}
sendMessageAcrossRing(msg);
return;
}
if (ring.node(msg.senderNodeId()) == null) {
if (log.isDebugEnabled())
log.debug("Discarding node left message since sender node is not in topology: " + msg);
return;
}
TcpDiscoveryNode leavingNode = ring.node(leavingNodeId);
if (leavingNode != null) {
synchronized (mux) {
leavingNodes.add(leavingNode);
}
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node left message since node was not found: " + msg);
msg.spanContainer().span()
.addLog(() -> "Discarded")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
boolean locNodeCoord = isLocalNodeCoordinator();
if (locNodeCoord) {
if (msg.verified()) {
msg.spanContainer().span()
.addLog(() -> "Ring failed")
.setStatus(SpanStatus.ABORTED)
.end();
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
msg.verify(locNodeId);
msg.spanContainer().span()
.addLog(() -> "Verified");
}
if (msg.verified() && !locNodeId.equals(leavingNodeId)) {
TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId);
interruptPing(leavingNode);
assert leftNode != null : msg;
if (log.isDebugEnabled())
log.debug("Removed node from topology: " + leftNode);
long topVer;
if (locNodeCoord) {
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
}
else {
topVer = msg.topologyVersion();
assert topVer > 0 : "Topology version is empty for message: " + msg;
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
lastMsg = msg;
}
if (msg.client()) {
ClientMessageWorker wrk = clientMsgWorkers.remove(leavingNodeId);
if (wrk != null)
wrk.addMessage(msg);
}
else if (leftNode.equals(next) && sock != null) {
try {
spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
if (log.isDebugEnabled())
log.debug("Sent verified node left message to leaving node: " + msg);
}
catch (IgniteCheckedException | IOException e) {
if (log.isDebugEnabled())
log.debug("Failed to send verified node left message to leaving node [msg=" + msg +
", err=" + e.getMessage() + ']');
onException("Failed to send verified node left message to leaving node [msg=" + msg +
", err=" + e.getMessage() + ']', e);
}
finally {
forceSndPending = true;
newNextNode(null);
U.closeQuiet(sock);
}
}
synchronized (mux) {
joiningNodes.remove(leftNode.id());
}
spi.stats.onNodeLeft();
boolean notified = notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode, msg.spanContainer());
notifiedDiscovery.set(notified);
synchronized (mux) {
failedNodes.remove(leftNode);
leavingNodes.remove(leftNode);
failedNodesMsgSent.remove(leftNode.id());
}
}
if (sendMessageToRemotes(msg)) {
try {
sendMessageAcrossRing(msg);
}
finally {
forceSndPending = false;
}
}
else {
forceSndPending = false;
if (log.isDebugEnabled())
log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
U.closeQuiet(sock);
processPendingMessagesLocally(msg);
}
checkPendingCustomMessages();
}