in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [4910:5207]
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
assert msg != null;
TcpDiscoveryNode node = msg.node();
assert node != null;
msg.spanContainer().span()
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> node.id().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> node.consistentId().toString());
if (node.internalOrder() < locNode.internalOrder()) {
if (!locNode.id().equals(node.id())) {
U.warn(log, "Discarding node added message since local node's order is greater " +
"[node=" + node + ", ring=" + ring + ", msg=" + msg + ']');
return;
}
else {
// TODO IGNITE-11272
if (log.isDebugEnabled())
log.debug("Received node added message with node order smaller than local node order " +
"(will appy) [node=" + node + ", ring=" + ring + ", msg=" + msg + ']');
}
}
UUID locNodeId = getLocalNodeId();
if (isLocalNodeCoordinator()) {
if (msg.verified()) {
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId,
node.id());
if (node.clientRouterNodeId() != null) {
addFinishMsg.clientDiscoData(msg.gridDiscoveryData());
addFinishMsg.clientNodeAttributes(node.attributes());
}
addFinishMsg = tracing.messages().branch(addFinishMsg, msg);
addFinishMsg.spanContainer().span()
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> node.id().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> node.consistentId().toString());
processNodeAddFinishedMessage(addFinishMsg);
tracing.messages().finishProcessing(addFinishMsg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
msg.verify(locNodeId);
msg.spanContainer().span()
.addLog(() -> "Verified");
}
else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
// Local node already has node from message in local topology.
// Just pass it to coordinator via the ring.
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
if (log.isDebugEnabled()) {
log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
"coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ locNode + ", msg=" + msg + ']');
}
if (debugMode) {
debugLog(msg, "Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
"coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ locNode + ", msg=" + msg + ']');
}
msg.spanContainer().span()
.addLog(() -> "Bypassed to crd")
.setStatus(SpanStatus.OK)
.end();
return;
}
if (msg.verified() && !locNodeId.equals(node.id())) {
if (!node.isClient() && nodesIdsHist.contains(node.id())) {
U.warn(log, "Discarding node added message since local node has already seen " +
"joining node in topology [node=" + node + ", locNode=" + locNode + ", msg=" + msg + ']');
msg.spanContainer().span()
.addLog(() -> "Discarded")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
if (node.internalOrder() <= ring.maxInternalOrder()) {
if (log.isDebugEnabled())
log.debug("Discarding node added message since new node's order is less than " +
"max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode +
", msg=" + msg + ']');
if (debugMode)
debugLog(msg, "Discarding node added message since new node's order is less than " +
"max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode +
", msg=" + msg + ']');
msg.spanContainer().span()
.addLog(() -> "Discarded")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
synchronized (mux) {
joiningNodes.add(node.id());
}
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
try {
SecurityCredentials cred = unmarshalCredentials(node);
if (cred == null) {
if (log.isDebugEnabled())
log.debug(
"Skipping global authentication for node (security credentials not found, " +
"probably, due to coordinator has older version) " +
"[nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) +
", coord=" + ring.coordinator() + ']');
authFailed = false;
}
else {
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
if (subj == null) {
// Node has not pass authentication.
LT.warn(log, "Authentication failed [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled())
log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node));
}
else
// Node will not be kicked out.
authFailed = false;
}
}
catch (IgniteException e) {
U.error(log, "Failed to verify node permissions consistency (will drop the node): " + node, e);
}
finally {
if (authFailed) {
try {
trySendMessageDirectly(
node,
new TcpDiscoveryAuthFailedMessage(locNodeId, spi.locHost, node.id())
);
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, node.id(),
node.internalOrder()));
}
}
}
if (msg.client())
node.clientAliveTime(spi.clientFailureDetectionTimeout());
boolean topChanged = ring.add(node);
if (topChanged) {
assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']';
DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
assert dataPacket != null : msg;
dataPacket.joiningNodeClient(msg.client());
if (dataPacket.hasJoiningNodeData()) {
if (spiState == CONNECTED) {
// Node already connected to the cluster can apply joining nodes' disco data immediately
spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
spi.collectExchangeData(dataPacket);
}
else if (spiState == CONNECTING)
// Node joining to the cluster should postpone applying disco data of other joiners till
// receiving gridDiscoData (when NodeAddFinished message arrives)
joiningNodesDiscoDataList.add(dataPacket);
}
processMessageFailedNodes(msg);
}
if (log.isDebugEnabled())
log.debug("Added node to local ring [added=" + topChanged + ", node=" + node +
", ring=" + ring + ']');
}
if (msg.verified() && locNodeId.equals(node.id())) {
synchronized (mux) {
if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) {
// Initialize topology.
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null && !top.isEmpty()) {
spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
assert n.internalOrder() < node.internalOrder() :
"Invalid node [topNode=" + n + ", added=" + node + ']';
// Make all preceding nodes and local node visible.
n.visible(true);
}
joiningNodes.clear();
locNode.setAttributes(node.attributes());
locNode.visible(true);
// Restore topology with all nodes visible.
ring.restoreTopology(top, node.internalOrder());
if (log.isDebugEnabled())
log.debug("Restored topology from node added message: " + ring);
gridDiscoveryData = msg.gridDiscoveryData();
joiningNodesDiscoDataList = new ArrayList<>();
topHist.clear();
topHist.putAll(msg.topologyHistory());
pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
msg.discardedCustomMessageId());
// Clear data to minimize message size.
msg.messages(null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node added message with empty topology: " + msg);
msg.spanContainer().span()
.addLog(() -> "Discarded")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node added message (this message has already been processed) " +
"[spiState=" + spiState +
", msg=" + msg +
", locNode=" + locNode + ']');
msg.spanContainer().span()
.addLog(() -> "Discarded")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
}
processMessageFailedNodes(msg);
}
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}