private void sendMessageAcrossRing()

in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [3412:3988]


        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
            assert msg != null;

            assert ring.hasRemoteNodes();

            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
                msgLsnr.apply(msg);

            if (msg instanceof TraceableMessage)
                tracing.messages().beforeSend((TraceableMessage)msg);

            sendMessageToClients(msg);

            List<TcpDiscoveryNode> failedNodes;

            TcpDiscoverySpiState state;

            synchronized (mux) {
                failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());

                state = spiState;
            }

            Collection<Throwable> errs = null;

            boolean sent = false;

            boolean newNextNode = false;

            // Used only if spi.getEffectiveConnectionRecoveryTimeout > 0
            CrossRingMessageSendState sndState = null;

            UUID locNodeId = getLocalNodeId();

            ringLoop: while (true) {
                TcpDiscoveryNode newNext = ring.nextNode(failedNodes);

                if (newNext == null) {
                    if (log.isDebugEnabled())
                        log.debug("No next node in topology.");

                    if (debugMode)
                        debugLog(msg, "No next node in topology.");

                    if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
                        !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
                        msg.senderNodeId(locNodeId);

                        addMessage(msg, true);
                    }

                    break;
                }

                if (!newNext.equals(next)) {
                    if (log.isDebugEnabled()) {
                        log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
                            ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
                    }
                    else if (log.isInfoEnabled())
                        log.info("New next node [newNext=" + newNext + ']');

                    if (debugMode)
                        debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next +
                            ", ring=" + ring + ", failedNodes=" + failedNodes + ']');

                    U.closeQuiet(sock);

                    sock = null;

                    newNextNode(newNext);

                    newNextNode = true;
                }
                else if (log.isTraceEnabled())
                    log.trace("Next node remains the same [nextId=" + next.id() +
                        ", nextOrder=" + next.internalOrder() + ']');

                List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());

                addr: for (InetSocketAddress addr : nextAddrs) {
                    long ackTimeout0 = spi.getAckTimeout();

                    if (locNodeAddrs.contains(addr)) {
                        if (log.isDebugEnabled()) {
                            log.debug("Skip to send message to the local node (probably remote node has the same " +
                                "loopback address that local node): " + addr);
                        }

                        continue;
                    }

                    int reconCnt = 0;

                    IgniteSpiOperationTimeoutHelper timeoutHelper = null;

                    while (true) {
                        if (sock == null) {
                            // We re-create the helper here because it could be created earlier with wrong timeout on
                            // message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the
                            // state of conenction recovering and have to work with
                            // TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
                            if (timeoutHelper == null || sndState != null)
                                timeoutHelper = serverOperationTimeoutHelper(sndState, -1);

                            boolean success = false;

                            boolean openSock = false;

                            // Restore ring.
                            try {
                                sock = spi.openSocket(addr, timeoutHelper);

                                out = spi.socketStream(sock);

                                openSock = true;

                                // Handshake.
                                TcpDiscoveryHandshakeRequest hndMsg = new TcpDiscoveryHandshakeRequest(locNodeId);

                                // Topology treated as changes if next node is not available.
                                boolean changeTop = sndState != null && !sndState.isStartingPoint();

                                if (changeTop)
                                    hndMsg.changeTopology(ring.previousNodeOf(next).id());

                                if (log.isDebugEnabled()) {
                                    log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
                                        "] with timeout " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                                }

                                spi.writeToSocket(sock, out, hndMsg,
                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

                                if (log.isDebugEnabled()) {
                                    log.debug("Reading handshake response with timeout " +
                                        timeoutHelper.nextTimeoutChunk(ackTimeout0));
                                }

                                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
                                    timeoutHelper.nextTimeoutChunk(ackTimeout0));

                                if (log.isDebugEnabled())
                                    log.debug("Handshake response: " + res);

                                // We should take previousNodeAlive flag into account
                                // only if we received the response from the correct node.
                                if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) {
                                    sndState.checkTimeout();

                                    // Remote node checked connection to it's previous and got success.
                                    boolean previousNode = sndState.markLastFailedNodeAlive();

                                    if (previousNode)
                                        failedNodes.remove(failedNodes.size() - 1);
                                    else {
                                        newNextNode = false;

                                        newNextNode(ring.nextNode(failedNodes));
                                    }

                                    U.closeQuiet(sock);

                                    sock = null;

                                    if (sndState.isFailed()) {
                                        segmentLocalNodeOnSendFail(failedNodes);

                                        return; // Nothing to do here.
                                    }

                                    if (previousNode)
                                        U.warn(log, "New next node has connection to it's previous, trying previous " +
                                            "again. [next=" + next + ']');

                                    continue ringLoop;
                                }

                                if (locNodeId.equals(res.creatorNodeId())) {
                                    if (log.isDebugEnabled())
                                        log.debug("Handshake response from local node: " + res);

                                    U.closeQuiet(sock);

                                    sock = null;

                                    break;
                                }

                                UUID nextId = res.creatorNodeId();

                                long nextOrder = res.order();

                                if (!next.id().equals(nextId)) {
                                    // Node with different ID has bounded to the same port.
                                    if (log.isDebugEnabled()) {
                                        log.debug("Failed to restore ring because next node ID received is not as " +
                                            "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
                                    }

                                    if (debugMode) {
                                        debugLog(msg, "Failed to restore ring because next node ID received is not " +
                                            "as expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
                                    }

                                    break;
                                }
                                else {
                                    // ID is as expected. Check node order.
                                    if (nextOrder != next.internalOrder()) {
                                        // Is next currently being added?
                                        boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage &&
                                            ((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId));

                                        if (!nextNew)
                                            nextNew = hasPendingAddMessage(nextId);

                                        if (!nextNew) {
                                            if (log.isDebugEnabled()) {
                                                log.debug("Failed to restore ring because next node order received " +
                                                    "is not as expected [expected=" + next.internalOrder() +
                                                    ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
                                            }

                                            if (debugMode) {
                                                debugLog(msg, "Failed to restore ring because next node order " +
                                                    "received is not as expected [expected=" + next.internalOrder() +
                                                    ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
                                            }

                                            break;
                                        }
                                    }

                                    updateLastSentMessageTime();

                                    if (log.isDebugEnabled())
                                        log.debug("Initialized connection with next node: " + next.id());

                                    if (debugMode)
                                        debugLog(msg, "Initialized connection with next node: " + next.id());

                                    errs = null;

                                    success = true;

                                    next.lastSuccessfulAddress(addr);
                                }
                            }
                            catch (IOException | IgniteCheckedException e) {
                                if (errs == null)
                                    errs = new ArrayList<>();

                                errs.add(e);

                                if (log.isDebugEnabled())
                                    U.error(log, "Failed to connect to next node [msg=" + msg
                                        + ", err=" + e.getMessage() + ']', e);

                                onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);

                                // Fastens failure detection.
                                if (sndState != null && sndState.checkTimeout()) {
                                    segmentLocalNodeOnSendFail(failedNodes);

                                    return; // Nothing to do here.
                                }

                                if (!openSock)
                                    break; // Don't retry if we can not establish connection.

                                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
                                    break;

                                if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                    break;
                                else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
                                    SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
                                    ackTimeout0 *= 2;

                                    if (!checkAckTimeout(ackTimeout0))
                                        break;
                                }

                                continue;
                            }
                            finally {
                                if (!success) {
                                    if (log.isDebugEnabled())
                                        log.debug("Closing socket to next: " + next);

                                    U.closeQuiet(sock);

                                    sock = null;
                                }
                                else {
                                    // Resetting timeout control object to let the code below to use a new one
                                    // for the next bunch of operations.
                                    timeoutHelper = null;
                                }
                            }
                        }

                        try {
                            boolean failure;

                            synchronized (mux) {
                                failure = ServerImpl.this.failedNodes.size() < failedNodes.size();
                            }

                            assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;

                            if (failure || forceSndPending || newNextNode) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Pending messages will be sent [failure=" + failure +
                                        ", newNextNode=" + newNextNode +
                                        ", forceSndPending=" + forceSndPending +
                                        ", failedNodes=" + failedNodes + ']');
                                }

                                if (debugMode) {
                                    debugLog(msg, "Pending messages will be sent [failure=" + failure +
                                        ", newNextNode=" + newNextNode +
                                        ", forceSndPending=" + forceSndPending +
                                        ", failedNodes=" + failedNodes + ']');
                                }

                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                    long tsNanos = System.nanoTime();

                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                        pendingMsgs.customDiscardId);

                                    addFailedNodes(pendingMsg, failedNodes);

                                    if (timeoutHelper == null)
                                        timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);

                                    try {
                                        spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
                                            spi.getSocketTimeout()));
                                    }
                                    finally {
                                        clearNodeAddedMessage(pendingMsg);
                                    }

                                    long tsNanos0 = System.nanoTime();

                                    int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));

                                    updateLastSentMessageTime();

                                    spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos));

                                    if (log.isDebugEnabled()) {
                                        log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                            ", res=" + res + ']');
                                    }

                                    if (debugMode) {
                                        debugLog(msg, "Pending message has been sent to next node [msgId=" + msg.id() +
                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                            ", res=" + res + ']');
                                    }

                                    // Resetting timeout control object to create a new one for the next bunch of
                                    // operations.
                                    timeoutHelper = null;
                                }
                            }

                            if (!(msg instanceof TcpDiscoveryConnectionCheckMessage))
                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs,
                                    pendingMsgs.customDiscardId);

                            try {
                                SecurityUtils.serializeVersion(1);

                                long tsNanos = System.nanoTime();

                                if (timeoutHelper == null)
                                    timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);

                                addFailedNodes(msg, failedNodes);

                                boolean latencyCheck = msg instanceof TcpDiscoveryRingLatencyCheckMessage;

                                if (latencyCheck && log.isInfoEnabled())
                                    log.info("Latency check message has been written to socket: " + msg.id());

                                spi.writeToSocket(newNextNode ? newNext : next,
                                    sock,
                                    out,
                                    msg,
                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

                                long tsNanos0 = System.nanoTime();

                                int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));

                                updateLastSentMessageTime();

                                if (latencyCheck && log.isInfoEnabled())
                                    log.info("Latency check message has been acked: " + msg.id());

                                spi.stats.onMessageSent(msg, U.nanosToMillis(tsNanos0 - tsNanos));

                                onMessageExchanged();

                                DebugLogger debugLog = messageLogger(msg);

                                if (debugLog.isDebugEnabled()) {
                                    debugLog.debug("Message has been sent to next node [msg=" + msg +
                                        ", next=" + next.id() +
                                        ", res=" + res + ']');
                                }

                                if (debugMode) {
                                    debugLog(msg, "Message has been sent to next node [msg=" + msg +
                                        ", next=" + next.id() +
                                        ", res=" + res + ']');
                                }
                            }
                            finally {
                                SecurityUtils.restoreDefaultSerializeVersion();

                                clearNodeAddedMessage(msg);
                            }

                            registerPendingMessage(msg);

                            sent = true;

                            break addr;
                        }
                        catch (IOException | IgniteCheckedException e) {
                            if (errs == null)
                                errs = new ArrayList<>();

                            errs.add(e);

                            if (log.isDebugEnabled())
                                U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg +
                                    ", err=" + e + ']', e);

                            onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                e);

                            if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                break;

                            if (!spi.failureDetectionTimeoutEnabled()) {
                                if (++reconCnt == spi.getReconnectCount())
                                    break;
                                else if (e instanceof SocketTimeoutException ||
                                    X.hasCause(e, SocketTimeoutException.class)) {
                                    ackTimeout0 *= 2;

                                    if (!checkAckTimeout(ackTimeout0))
                                        break;
                                }
                            }
                        }
                        finally {
                            forceSndPending = false;

                            if (!sent) {
                                if (log.isDebugEnabled())
                                    log.debug("Closing socket to next (not sent): " + next);

                                U.closeQuiet(sock);

                                sock = null;

                                if (log.isDebugEnabled()) {
                                    log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
                                        (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']');
                                }
                            }
                        }
                    } // Try to reconnect.
                } // Iterating node's addresses.

                if (!sent) {
                    if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
                        sndState = new CrossRingMessageSendState();
                    else if (sndState != null && sndState.checkTimeout()) {
                        segmentLocalNodeOnSendFail(failedNodes);

                        return; // Nothing to do here.
                    }

                    boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();

                    if (failedNextNode && !failedNodes.contains(next)) {
                        failedNodes.add(next);

                        if (state == CONNECTED) {
                            Exception err = errs != null ?
                                U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg +
                                    ", next=" + U.toShortString(next) + ']', errs) :
                                null;

                            // If node existed on connection initialization we should check
                            // whether it has not gone yet.
                            U.warn(log, "Failed to send message to next node [msg=" + msg + ", next=" + next +
                                ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
                        }
                    }
                    else if (!failedNextNode && sndState != null && sndState.isBackward()) {
                        boolean prev = sndState.markLastFailedNodeAlive();

                        U.warn(log, "Failed to send message to next node, try previous [msg=" + msg +
                            ", next=" + next + ']');

                        if (prev)
                            failedNodes.remove(failedNodes.size() - 1);
                        else {
                            newNextNode = false;

                            newNextNode(ring.nextNode(failedNodes));
                        }
                    }

                    newNextNode(null);

                    errs = null;
                }
                else
                    break;
            }

            synchronized (mux) {
                failedNodes.removeAll(ServerImpl.this.failedNodes.keySet());
            }

            if (!failedNodes.isEmpty()) {
                if (state == CONNECTED) {
                    if (!sent && log.isDebugEnabled())
                        // Message has not been sent due to some problems.
                        log.debug("Message has not been sent: " + msg);

                    if (log.isDebugEnabled())
                        log.debug("Detected failed nodes: " + failedNodes);
                }

                synchronized (mux) {
                    for (TcpDiscoveryNode failedNode : failedNodes) {
                        if (!ServerImpl.this.failedNodes.containsKey(failedNode))
                            ServerImpl.this.failedNodes.put(failedNode, locNodeId);
                    }

                    for (TcpDiscoveryNode failedNode : failedNodes)
                        failedNodesMsgSent.add(failedNode.id());
                }

                for (TcpDiscoveryNode n : failedNodes)
                    msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));

                if (!sent) {
                    assert next == null : next;

                    if (log.isDebugEnabled())
                        log.debug("Pending messages will be resent to local node");

                    if (debugMode)
                        debugLog(msg, "Pending messages will be resent to local node");

                    processPendingMessagesLocally(msg);
                }

                LT.warn(log, "Local node has detected failed nodes and started cluster-wide procedure. " +
                        "To speed up failure detection please see 'Failure Detection' section under javadoc" +
                        " for 'TcpDiscoverySpi'");
            }
        }