in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [4136:4756]
private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
assert msg != null;
final TcpDiscoveryNode node = msg.node();
final UUID locNodeId = getLocalNodeId();
msg.spanContainer().span()
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> node.id().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> node.consistentId().toString());
if (locNodeId.equals(node.id())) {
if (log.isDebugEnabled())
log.debug("Received join request for local node, dropping: " + msg);
msg.spanContainer().span()
.addLog(() -> "Dropped")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
if (!msg.client()) {
boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
node.socketAddresses().iterator().next().getAddress().isLoopbackAddress();
// This check is performed by the node joining node is connected to, but not by coordinator
// because loopback problem message is sent directly to the joining node which may be unavailable
// if coordinator resides on another host.
if (spi.locHost.isLoopbackAddress() != rmtHostLoopback) {
String firstNode = rmtHostLoopback ? "remote" : "local";
String secondNode = rmtHostLoopback ? "local" : "remote";
String errMsg = "Failed to add node to topology because " + firstNode +
" node is configured to use loopback address, but " + secondNode + " node is not " +
"(consider changing 'localAddress' configuration parameter) " +
"[locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) + ']';
LT.warn(log, errMsg);
// Always output in debug.
if (log.isDebugEnabled())
log.debug(errMsg);
try {
trySendMessageDirectly(node, new TcpDiscoveryLoopbackProblemMessage(
locNodeId, locNode.addresses(), locNode.hostNames()));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send loopback problem message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
}
onException("Failed to send loopback problem message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
}
if (isLocalNodeCoordinator()) {
TcpDiscoveryNode existingNode = ring.node(node.id());
if (existingNode != null) {
if (!node.socketAddresses().equals(existingNode.socketAddresses())) {
if (!pingNode(existingNode)) {
U.warn(log, "Sending node failed message for existing node: " + node);
addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
existingNode.id(), existingNode.internalOrder()));
// Ignore this join request since existing node is about to fail
// and new node can continue.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
try {
trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, existingNode));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send duplicate ID message to node " +
"[node=" + node + ", existingNode=" + existingNode +
", err=" + e.getMessage() + ']');
}
onException("Failed to send duplicate ID message to node " +
"[node=" + node + ", existingNode=" + existingNode + ']', e);
}
// Output warning.
LT.warn(log, "Ignoring join request from node (duplicate ID) [node=" + node +
", existingNode=" + existingNode + ']');
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
if (msg.client()) {
TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
node.clientRouterNodeId(),
null);
reconMsg.verify(getLocalNodeId());
Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);
if (msgs != null) {
reconMsg.pendingMessages(msgs);
reconMsg.success(true);
}
if (log.isDebugEnabled()) {
log.debug("Send reconnect message to already joined client " +
"[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
}
if (getLocalNodeId().equals(node.clientRouterNodeId())) {
ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
if (wrk != null)
wrk.addMessage(reconMsg);
else if (log.isDebugEnabled()) {
log.debug("Failed to find client message worker " +
"[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
}
}
else {
if (sendMessageToRemotes(reconMsg))
sendMessageAcrossRing(reconMsg);
}
}
else if (log.isDebugEnabled())
log.debug("Ignoring join request message since node is already in topology: " + msg);
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
else {
if (!node.isClient()) {
if (nodesIdsHist.contains(node.id())) {
try {
trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, node));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send duplicate ID message to node " +
"[node=" + node +
", err=" + e.getMessage() + ']');
}
onException("Failed to send duplicate ID message to node: " + node, e);
}
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
}
}
IgniteNodeValidationResult err = ensureJoinEnabled(node);
if (spi.nodeAuth != null && err == null) {
// Authenticate node first.
try {
SecurityCredentials cred = unmarshalCredentials(node);
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
if (subj == null) {
// Node has not pass authentication.
LT.warn(log, "Authentication failed [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled()) {
log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node));
}
try {
trySendMessageDirectly(
node,
new TcpDiscoveryAuthFailedMessage(locNodeId, spi.locHost, node.id())
);
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
}
onException("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
else {
if (!(subj instanceof Serializable)) {
// Node has not pass authentication.
LT.warn(log, "Authentication subject is not Serializable [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled()) {
log.debug("Authentication subject is not serializable [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node));
}
try {
trySendMessageDirectly(
node,
new TcpDiscoveryAuthFailedMessage(locNodeId, spi.locHost, node.id())
);
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
}
}
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
// Stick in authentication subject to node (use security-safe attributes for copy).
node.setAttributes(withSecurityContext(subj, node.getAttributes(), spi.marshaller()));
}
}
catch (IgniteException | IgniteCheckedException e) {
LT.error(log, e, "Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node) + ']');
if (log.isDebugEnabled()) {
log.debug("Failed to authenticate node (will ignore join request) [node=" + node +
", err=" + e + ']');
}
onException("Failed to authenticate node (will ignore join request) [node=" + node +
", err=" + e + ']', e);
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
}
if (err == null)
err = spi.getSpiContext().validateNode(node);
if (err == null) {
try {
DiscoveryDataBag data = msg.gridDiscoveryData().unmarshalJoiningNodeData(
spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()),
false,
log
);
err = spi.getSpiContext().validateNode(node, data);
}
catch (IgniteCheckedException e) {
err = new IgniteNodeValidationResult(node.id(), e.getMessage());
}
}
if (err != null) {
final IgniteNodeValidationResult err0 = err;
if (log.isDebugEnabled())
log.debug("Node validation failed [res=" + err + ", node=" + node + ']');
utilityPool.execute(
new Runnable() {
@Override public void run() {
spi.getSpiContext().recordEvent(new NodeValidationFailedEvent(locNode, node, err0));
boolean ping = node.id().equals(err0.nodeId()) ? pingNode(node) : pingNode(err0.nodeId());
if (!ping) {
if (log.isDebugEnabled()) {
log.debug("Conflicting node has already left, need to wait for event. " +
"Will ignore join request for now since it will be recent [req=" + msg +
", err=" + err0.message() + ']');
}
// Ignore join request.
return;
}
LT.warn(log, err0.message());
// Always output in debug.
if (log.isDebugEnabled())
log.debug(err0.message());
try {
trySendMessageDirectly(node,
new TcpDiscoveryCheckFailedMessage(err0.nodeId(), err0.sendMessage()));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send hash ID resolver validation failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
}
onException("Failed to send hash ID resolver validation failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
}
}
);
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
final String locMarsh = locNode.attribute(ATTR_MARSHALLER);
final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
if (!Objects.equals(locMarsh, rmtMarsh)) {
utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's marshaller differs from remote node's marshaller " +
"(to make sure all nodes in topology have identical marshaller, " +
"configure marshaller explicitly in configuration) " +
"[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
LT.warn(log, errMsg);
// Always output in debug.
if (log.isDebugEnabled())
log.debug(errMsg);
try {
String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
"(to make sure all nodes in topology have identical marshaller, " +
"configure marshaller explicitly in configuration) " +
"[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
trySendMessageDirectly(node,
new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send marshaller check failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send marshaller check failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
}
}
);
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
// If node have no value for this attribute then we treat it as true.
final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;
final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid;
Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
// Can be null only in tests.
boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;
if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
" property value differs from remote node's value " +
"(to make sure all nodes in topology have identical marshaller settings, " +
"configure system property explicitly) " +
"[locMarshUseDfltSuid=" + locMarshUseDfltSuid +
", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
" property value differs from remote node's value " +
"(to make sure all nodes in topology have identical marshaller settings, " +
"configure system property explicitly) " +
"[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
nodeCheckError(
node,
errMsg,
sndMsg);
}
});
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
// Validate compact footer flags.
Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;
Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
"the same property on remote node (make sure all nodes in topology have the same value " +
"of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
"the same property on remote node (make sure all nodes in topology have the same value " +
"of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
nodeCheckError(
node,
errMsg,
sndMsg);
}
});
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
// Validate String serialization mechanism used by the BinaryMarshaller.
final Boolean locMarshStrSerialVer2 = locNode.attribute(ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2);
final boolean locMarshStrSerialVer2Bool = locMarshStrSerialVer2 != null ? locMarshStrSerialVer2 : false;
final Boolean rmtMarshStrSerialVer2 = node.attribute(ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2);
final boolean rmtMarshStrSerialVer2Bool = rmtMarshStrSerialVer2 != null ? rmtMarshStrSerialVer2 : false;
if (locMarshStrSerialVer2Bool != rmtMarshStrSerialVer2Bool) {
utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
" property value differs from remote node's value " +
"(to make sure all nodes in topology have identical marshaller settings, " +
"configure system property explicitly) " +
"[locMarshStrSerialVer2=" + locMarshStrSerialVer2 +
", rmtMarshStrSerialVer2=" + rmtMarshStrSerialVer2 +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
" property value differs from remote node's value " +
"(to make sure all nodes in topology have identical marshaller settings, " +
"configure system property explicitly) " +
"[locMarshStrSerialVer2=" + rmtMarshStrSerialVer2 +
", rmtMarshStrSerialVer2=" + locMarshStrSerialVer2 +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
nodeCheckError(
node,
errMsg,
sndMsg);
}
});
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
Boolean rmtLateAssign = node.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
// Can be null only in tests.
boolean rmtLateAssignBool = rmtLateAssign != null ? rmtLateAssign : false;
if (locLateAssignBool != rmtLateAssignBool) {
String errMsg = "Local node's cache affinity assignment mode differs from " +
"the same property on remote node (make sure all nodes in topology have the same " +
"cache affinity assignment mode) [locLateAssign=" + locLateAssignBool +
", rmtLateAssign=" + rmtLateAssignBool +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's cache affinity assignment mode differs from " +
"the same property on remote node (make sure all nodes in topology have the same " +
"cache affinity assignment mode) [locLateAssign=" + rmtLateAssignBool +
", rmtLateAssign=" + locLateAssign +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
nodeCheckError(node, errMsg, sndMsg);
// Ignore join request.
msg.spanContainer().span()
.addLog(() -> "Ignored")
.setStatus(SpanStatus.ABORTED)
.end();
return;
}
// Handle join.
node.internalOrder(ring.nextNodeOrder());
if (log.isDebugEnabled())
log.debug("Internal order has been assigned to node: " + node);
DiscoveryDataPacket data = msg.gridDiscoveryData();
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
node, data, spi.gridStartTime);
nodeAddedMsg = tracing.messages().branch(nodeAddedMsg, msg);
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
tracing.messages().finishProcessing(nodeAddedMsg);
}
else {
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
}