private void processJoinRequestMessage()

in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [4136:4756]


        private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
            assert msg != null;

            final TcpDiscoveryNode node = msg.node();

            final UUID locNodeId = getLocalNodeId();

            msg.spanContainer().span()
                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> node.id().toString())
                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
                    () -> node.consistentId().toString());

            if (locNodeId.equals(node.id())) {
                if (log.isDebugEnabled())
                    log.debug("Received join request for local node, dropping: " + msg);

                msg.spanContainer().span()
                    .addLog(() -> "Dropped")
                    .setStatus(SpanStatus.ABORTED)
                    .end();

                return;
            }

            if (!msg.client()) {
                boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
                    node.socketAddresses().iterator().next().getAddress().isLoopbackAddress();

                // This check is performed by the node joining node is connected to, but not by coordinator
                // because loopback problem message is sent directly to the joining node which may be unavailable
                // if coordinator resides on another host.
                if (spi.locHost.isLoopbackAddress() != rmtHostLoopback) {
                    String firstNode = rmtHostLoopback ? "remote" : "local";

                    String secondNode = rmtHostLoopback ? "local" : "remote";

                    String errMsg = "Failed to add node to topology because " + firstNode +
                        " node is configured to use loopback address, but " + secondNode + " node is not " +
                        "(consider changing 'localAddress' configuration parameter) " +
                        "[locNodeAddrs=" + U.addressesAsString(locNode) +
                        ", rmtNodeAddrs=" + U.addressesAsString(node) + ']';

                    LT.warn(log, errMsg);

                    // Always output in debug.
                    if (log.isDebugEnabled())
                        log.debug(errMsg);

                    try {
                        trySendMessageDirectly(node, new TcpDiscoveryLoopbackProblemMessage(
                            locNodeId, locNode.addresses(), locNode.hostNames()));
                    }
                    catch (IgniteSpiException e) {
                        if (log.isDebugEnabled()) {
                            log.debug("Failed to send loopback problem message to node " +
                                "[node=" + node + ", err=" + e.getMessage() + ']');
                        }

                        onException("Failed to send loopback problem message to node " +
                            "[node=" + node + ", err=" + e.getMessage() + ']', e);
                    }

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }
            }

            if (isLocalNodeCoordinator()) {
                TcpDiscoveryNode existingNode = ring.node(node.id());

                if (existingNode != null) {
                    if (!node.socketAddresses().equals(existingNode.socketAddresses())) {
                        if (!pingNode(existingNode)) {
                            U.warn(log, "Sending node failed message for existing node: " + node);

                            addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
                                existingNode.id(), existingNode.internalOrder()));

                            // Ignore this join request since existing node is about to fail
                            // and new node can continue.
                            msg.spanContainer().span()
                                .addLog(() -> "Ignored")
                                .setStatus(SpanStatus.ABORTED)
                                .end();

                            return;
                        }

                        try {
                            trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, existingNode));
                        }
                        catch (IgniteSpiException e) {
                            if (log.isDebugEnabled()) {
                                log.debug("Failed to send duplicate ID message to node " +
                                    "[node=" + node + ", existingNode=" + existingNode +
                                    ", err=" + e.getMessage() + ']');
                            }

                            onException("Failed to send duplicate ID message to node " +
                                "[node=" + node + ", existingNode=" + existingNode + ']', e);
                        }

                        // Output warning.
                        LT.warn(log, "Ignoring join request from node (duplicate ID) [node=" + node +
                            ", existingNode=" + existingNode + ']');

                        // Ignore join request.
                        msg.spanContainer().span()
                            .addLog(() -> "Ignored")
                            .setStatus(SpanStatus.ABORTED)
                            .end();

                        return;
                    }

                    if (msg.client()) {
                        TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
                            node.clientRouterNodeId(),
                            null);

                        reconMsg.verify(getLocalNodeId());

                        Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);

                        if (msgs != null) {
                            reconMsg.pendingMessages(msgs);

                            reconMsg.success(true);
                        }

                        if (log.isDebugEnabled()) {
                            log.debug("Send reconnect message to already joined client " +
                                "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
                        }

                        if (getLocalNodeId().equals(node.clientRouterNodeId())) {
                            ClientMessageWorker wrk = clientMsgWorkers.get(node.id());

                            if (wrk != null)
                                wrk.addMessage(reconMsg);
                            else if (log.isDebugEnabled()) {
                                log.debug("Failed to find client message worker " +
                                    "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
                            }
                        }
                        else {
                            if (sendMessageToRemotes(reconMsg))
                                sendMessageAcrossRing(reconMsg);
                        }
                    }
                    else if (log.isDebugEnabled())
                        log.debug("Ignoring join request message since node is already in topology: " + msg);

                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }
                else {
                    if (!node.isClient()) {
                        if (nodesIdsHist.contains(node.id())) {
                            try {
                                trySendMessageDirectly(node, createTcpDiscoveryDuplicateIdMessage(locNodeId, node));
                            }
                            catch (IgniteSpiException e) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Failed to send duplicate ID message to node " +
                                        "[node=" + node +
                                        ", err=" + e.getMessage() + ']');
                                }

                                onException("Failed to send duplicate ID message to node: " + node, e);
                            }

                            msg.spanContainer().span()
                                .addLog(() -> "Ignored")
                                .setStatus(SpanStatus.ABORTED)
                                .end();

                            return;
                        }
                    }
                }

                IgniteNodeValidationResult err = ensureJoinEnabled(node);

                if (spi.nodeAuth != null && err == null) {
                    // Authenticate node first.
                    try {
                        SecurityCredentials cred = unmarshalCredentials(node);

                        SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);

                        if (subj == null) {
                            // Node has not pass authentication.
                            LT.warn(log, "Authentication failed [nodeId=" + node.id() +
                                ", addrs=" + U.addressesAsString(node) + ']');

                            // Always output in debug.
                            if (log.isDebugEnabled()) {
                                log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" +
                                    U.addressesAsString(node));
                            }

                            try {
                                trySendMessageDirectly(
                                    node,
                                    new TcpDiscoveryAuthFailedMessage(locNodeId, spi.locHost, node.id())
                                );
                            }
                            catch (IgniteSpiException e) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Failed to send unauthenticated message to node " +
                                        "[node=" + node + ", err=" + e.getMessage() + ']');
                                }

                                onException("Failed to send unauthenticated message to node " +
                                    "[node=" + node + ", err=" + e.getMessage() + ']', e);
                            }

                            // Ignore join request.
                            msg.spanContainer().span()
                                .addLog(() -> "Ignored")
                                .setStatus(SpanStatus.ABORTED)
                                .end();

                            return;
                        }
                        else {
                            if (!(subj instanceof Serializable)) {
                                // Node has not pass authentication.
                                LT.warn(log, "Authentication subject is not Serializable [nodeId=" + node.id() +
                                    ", addrs=" + U.addressesAsString(node) + ']');

                                // Always output in debug.
                                if (log.isDebugEnabled()) {
                                    log.debug("Authentication subject is not serializable [nodeId=" + node.id() +
                                        ", addrs=" + U.addressesAsString(node));
                                }

                                try {
                                    trySendMessageDirectly(
                                        node,
                                        new TcpDiscoveryAuthFailedMessage(locNodeId, spi.locHost, node.id())
                                    );
                                }
                                catch (IgniteSpiException e) {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Failed to send unauthenticated message to node " +
                                            "[node=" + node + ", err=" + e.getMessage() + ']');
                                    }
                                }

                                // Ignore join request.
                                msg.spanContainer().span()
                                    .addLog(() -> "Ignored")
                                    .setStatus(SpanStatus.ABORTED)
                                    .end();

                                return;
                            }

                            // Stick in authentication subject to node (use security-safe attributes for copy).
                            node.setAttributes(withSecurityContext(subj, node.getAttributes(), spi.marshaller()));
                        }
                    }
                    catch (IgniteException | IgniteCheckedException e) {
                        LT.error(log, e, "Authentication failed [nodeId=" + node.id() + ", addrs=" +
                            U.addressesAsString(node) + ']');

                        if (log.isDebugEnabled()) {
                            log.debug("Failed to authenticate node (will ignore join request) [node=" + node +
                                ", err=" + e + ']');
                        }

                        onException("Failed to authenticate node (will ignore join request) [node=" + node +
                            ", err=" + e + ']', e);

                        // Ignore join request.
                        msg.spanContainer().span()
                            .addLog(() -> "Ignored")
                            .setStatus(SpanStatus.ABORTED)
                            .end();

                        return;
                    }
                }

                if (err == null)
                    err = spi.getSpiContext().validateNode(node);

                if (err == null) {
                    try {
                        DiscoveryDataBag data = msg.gridDiscoveryData().unmarshalJoiningNodeData(
                            spi.marshaller(),
                            U.resolveClassLoader(spi.ignite().configuration()),
                            false,
                            log
                        );

                        err = spi.getSpiContext().validateNode(node, data);
                    }
                    catch (IgniteCheckedException e) {
                        err = new IgniteNodeValidationResult(node.id(), e.getMessage());
                    }
                }

                if (err != null) {
                    final IgniteNodeValidationResult err0 = err;

                    if (log.isDebugEnabled())
                        log.debug("Node validation failed [res=" + err + ", node=" + node + ']');

                    utilityPool.execute(
                        new Runnable() {
                            @Override public void run() {
                                spi.getSpiContext().recordEvent(new NodeValidationFailedEvent(locNode, node, err0));

                                boolean ping = node.id().equals(err0.nodeId()) ? pingNode(node) : pingNode(err0.nodeId());

                                if (!ping) {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Conflicting node has already left, need to wait for event. " +
                                            "Will ignore join request for now since it will be recent [req=" + msg +
                                            ", err=" + err0.message() + ']');
                                    }

                                    // Ignore join request.
                                    return;
                                }

                                LT.warn(log, err0.message());

                                // Always output in debug.
                                if (log.isDebugEnabled())
                                    log.debug(err0.message());

                                try {
                                    trySendMessageDirectly(node,
                                        new TcpDiscoveryCheckFailedMessage(err0.nodeId(), err0.sendMessage()));
                                }
                                catch (IgniteSpiException e) {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Failed to send hash ID resolver validation failed message to node " +
                                            "[node=" + node + ", err=" + e.getMessage() + ']');
                                    }

                                    onException("Failed to send hash ID resolver validation failed message to node " +
                                        "[node=" + node + ", err=" + e.getMessage() + ']', e);
                                }
                            }
                        }
                    );

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                final String locMarsh = locNode.attribute(ATTR_MARSHALLER);
                final String rmtMarsh = node.attribute(ATTR_MARSHALLER);

                if (!Objects.equals(locMarsh, rmtMarsh)) {
                    utilityPool.execute(
                        new Runnable() {
                            @Override public void run() {
                                String errMsg = "Local node's marshaller differs from remote node's marshaller " +
                                    "(to make sure all nodes in topology have identical marshaller, " +
                                    "configure marshaller explicitly in configuration) " +
                                    "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';

                                LT.warn(log, errMsg);

                                // Always output in debug.
                                if (log.isDebugEnabled())
                                    log.debug(errMsg);

                                try {
                                    String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
                                        "(to make sure all nodes in topology have identical marshaller, " +
                                        "configure marshaller explicitly in configuration) " +
                                        "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
                                        ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
                                        ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
                                        ", rmtNodeId=" + locNode.id() + ']';

                                    trySendMessageDirectly(node,
                                        new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
                                }
                                catch (IgniteSpiException e) {
                                    if (log.isDebugEnabled())
                                        log.debug("Failed to send marshaller check failed message to node " +
                                            "[node=" + node + ", err=" + e.getMessage() + ']');

                                    onException("Failed to send marshaller check failed message to node " +
                                        "[node=" + node + ", err=" + e.getMessage() + ']', e);
                                }
                            }
                        }
                    );

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                // If node have no value for this attribute then we treat it as true.
                final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
                boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;

                final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
                boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid;

                Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
                // Can be null only in tests.
                boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;

                if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
                    utilityPool.execute(
                        new Runnable() {
                            @Override public void run() {
                                String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
                                    " property value differs from remote node's value " +
                                    "(to make sure all nodes in topology have identical marshaller settings, " +
                                    "configure system property explicitly) " +
                                    "[locMarshUseDfltSuid=" + locMarshUseDfltSuid +
                                    ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';

                                String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
                                    " property value differs from remote node's value " +
                                    "(to make sure all nodes in topology have identical marshaller settings, " +
                                    "configure system property explicitly) " +
                                    "[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
                                    ", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid +
                                    ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
                                    ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
                                    ", rmtNodeId=" + locNode.id() + ']';

                                nodeCheckError(
                                    node,
                                    errMsg,
                                    sndMsg);
                            }
                        });

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                // Validate compact footer flags.
                Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
                final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;

                Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
                final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;

                if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
                    utilityPool.execute(
                        new Runnable() {
                            @Override public void run() {
                                String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
                                    "the same property on remote node (make sure all nodes in topology have the same value " +
                                    "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
                                    ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';

                                String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
                                    "the same property on remote node (make sure all nodes in topology have the same value " +
                                    "of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
                                    ", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool +
                                    ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
                                    ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
                                    ", rmtNodeId=" + locNode.id() + ']';

                                nodeCheckError(
                                    node,
                                    errMsg,
                                    sndMsg);
                            }
                        });

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                // Validate String serialization mechanism used by the BinaryMarshaller.
                final Boolean locMarshStrSerialVer2 = locNode.attribute(ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2);
                final boolean locMarshStrSerialVer2Bool = locMarshStrSerialVer2 != null ? locMarshStrSerialVer2 : false;

                final Boolean rmtMarshStrSerialVer2 = node.attribute(ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2);
                final boolean rmtMarshStrSerialVer2Bool = rmtMarshStrSerialVer2 != null ? rmtMarshStrSerialVer2 : false;

                if (locMarshStrSerialVer2Bool != rmtMarshStrSerialVer2Bool) {
                    utilityPool.execute(
                        new Runnable() {
                            @Override public void run() {
                                String errMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
                                    " property value differs from remote node's value " +
                                    "(to make sure all nodes in topology have identical marshaller settings, " +
                                    "configure system property explicitly) " +
                                    "[locMarshStrSerialVer2=" + locMarshStrSerialVer2 +
                                    ", rmtMarshStrSerialVer2=" + rmtMarshStrSerialVer2 +
                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';

                                String sndMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
                                    " property value differs from remote node's value " +
                                    "(to make sure all nodes in topology have identical marshaller settings, " +
                                    "configure system property explicitly) " +
                                    "[locMarshStrSerialVer2=" + rmtMarshStrSerialVer2 +
                                    ", rmtMarshStrSerialVer2=" + locMarshStrSerialVer2 +
                                    ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
                                    ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
                                    ", rmtNodeId=" + locNode.id() + ']';

                                nodeCheckError(
                                    node,
                                    errMsg,
                                    sndMsg);
                            }
                        });

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                Boolean rmtLateAssign = node.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
                // Can be null only in tests.
                boolean rmtLateAssignBool = rmtLateAssign != null ? rmtLateAssign : false;

                if (locLateAssignBool != rmtLateAssignBool) {
                    String errMsg = "Local node's cache affinity assignment mode differs from " +
                        "the same property on remote node (make sure all nodes in topology have the same " +
                        "cache affinity assignment mode) [locLateAssign=" + locLateAssignBool +
                        ", rmtLateAssign=" + rmtLateAssignBool +
                        ", locNodeAddrs=" + U.addressesAsString(locNode) +
                        ", rmtNodeAddrs=" + U.addressesAsString(node) +
                        ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';

                    String sndMsg = "Local node's cache affinity assignment mode differs from " +
                        "the same property on remote node (make sure all nodes in topology have the same " +
                        "cache affinity assignment mode) [locLateAssign=" + rmtLateAssignBool +
                        ", rmtLateAssign=" + locLateAssign +
                        ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
                        ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
                        ", rmtNodeId=" + locNode.id() + ']';

                    nodeCheckError(node, errMsg, sndMsg);

                    // Ignore join request.
                    msg.spanContainer().span()
                        .addLog(() -> "Ignored")
                        .setStatus(SpanStatus.ABORTED)
                        .end();

                    return;
                }

                // Handle join.
                node.internalOrder(ring.nextNodeOrder());

                if (log.isDebugEnabled())
                    log.debug("Internal order has been assigned to node: " + node);

                DiscoveryDataPacket data = msg.gridDiscoveryData();

                TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                    node, data, spi.gridStartTime);

                nodeAddedMsg = tracing.messages().branch(nodeAddedMsg, msg);

                nodeAddedMsg.client(msg.client());

                processNodeAddedMessage(nodeAddedMsg);

                tracing.messages().finishProcessing(nodeAddedMsg);
            }
            else {
                if (sendMessageToRemotes(msg))
                    sendMessageAcrossRing(msg);
            }
        }