public GridNioSession createNioSession()

in modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java [350:663]


    public GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
        boolean locNodeIsSrv = !locNodeSupplier.get().isClient();

        if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
            if (node.isClient() && forceClientToServerConnections(node)) {
                String msg = "Failed to connect to node " + node.id() + " because it is started" +
                    " in 'forceClientToServerConnections' mode; inverse connection will be requested.";

                throw new NodeUnreachableException(msg);
            }
        }

        Collection<InetSocketAddress> addrs = nodeAddresses(node, cfg.filterReachableAddresses(), attrs, locNodeSupplier);

        GridNioSession ses = null;
        IgniteCheckedException errs = null;

        long totalTimeout;

        if (cfg.failureDetectionTimeoutEnabled())
            totalTimeout = node.isClient() ? stateProvider.clientFailureDetectionTimeout() : cfg.failureDetectionTimeout();
        else {
            totalTimeout = ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(
                cfg.connectionTimeout(),
                cfg.maxConnectionTimeout(),
                cfg.reconCount()
            );
        }

        Set<InetSocketAddress> failedAddrsSet = new HashSet<>();
        int skippedAddrs = 0;

        for (InetSocketAddress addr : addrs) {
            if (addr.isUnresolved()) {
                failedAddrsSet.add(addr);

                continue;
            }

            TimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(
                totalTimeout,
                cfg.failureDetectionTimeoutEnabled() ? DFLT_INITIAL_TIMEOUT : cfg.connectionTimeout(),
                cfg.maxConnectionTimeout()
            );

            while (ses == null) { // Reconnection on handshake timeout.
                if (stopping)
                    throw new IgniteSpiException("Node is stopping.");

                if (isLocalNodeAddress(addr)) {
                    if (log.isDebugEnabled())
                        log.debug("Skipping local address [addr=" + addr +
                            ", locAddrs=" + node.attribute(attrs.addresses()) +
                            ", node=" + node + ']');

                    skippedAddrs++;

                    break;
                }

                long timeout = 0;

                connectGate.enter();

                try {
                    if (nodeGetter.apply(node.id()) == null)
                        throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);

                    SocketChannel ch = socketChannelFactory.get();

                    ch.configureBlocking(true);

                    ch.socket().setTcpNoDelay(cfg.tcpNoDelay());
                    ch.socket().setKeepAlive(true);

                    if (cfg.socketReceiveBuffer() > 0)
                        ch.socket().setReceiveBufferSize(cfg.socketReceiveBuffer());

                    if (cfg.socketSendBuffer() > 0)
                        ch.socket().setSendBufferSize(cfg.socketSendBuffer());

                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);

                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);

                    assert recoveryDesc != null :
                        "Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';

                    if (!recoveryDesc.reserve()) {
                        U.closeQuiet(ch);

                        // Ensure the session is closed.
                        GridNioSession sesFromRecovery = recoveryDesc.session();

                        if (sesFromRecovery != null) {
                            while (sesFromRecovery.closeTime() == 0)
                                sesFromRecovery.close();
                        }

                        return null;
                    }

                    long rcvCnt;

                    Map<Integer, Object> meta = new HashMap<>();

                    GridSslMeta sslMeta = null;

                    try {
                        timeout = connTimeoutStgy.nextTimeout();

                        ch.socket().connect(addr, (int)timeout);

                        if (nodeGetter.apply(node.id()) == null)
                            throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);

                        if (stateProvider.isSslEnabled()) {
                            meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());

                            SSLEngine sslEngine = stateProvider.createSSLEngine();

                            sslEngine.setUseClientMode(true);

                            sslMeta.sslEngine(sslEngine);
                        }

                        ClusterNode locNode = locNodeSupplier.get();

                        if (locNode == null)
                            throw new IgniteCheckedException("Local node has not been started or " +
                                "fully initialized [isStopping=" + stateProvider.isStopping() + ']');

                        timeout = connTimeoutStgy.nextTimeout(timeout);

                        rcvCnt = safeTcpHandshake(ch,
                            node.id(),
                            timeout,
                            sslMeta,
                            new HandshakeMessage2(locNode.id(),
                                recoveryDesc.incrementConnectCount(),
                                recoveryDesc.received(),
                                connIdx));

                        if (rcvCnt == ALREADY_CONNECTED)
                            return null;
                        else if (rcvCnt == NODE_STOPPING) {
                            // Safe to remap on remote node stopping.
                            throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                        }
                        else if (rcvCnt == UNKNOWN_NODE)
                            throw new IgniteCheckedException("Remote node does not observe current node " +
                                "in topology : " + node.id());
                        else if (rcvCnt == NEED_WAIT) {
                            // Should wait for discovery lag without applying connTimeoutStgy, otherwise cache operations
                            // may hang on waiting inexistent topology, see IgniteClientConnectTest for test
                            // scenarios with delayed client node join.
                            if (log.isDebugEnabled())
                                log.debug("NEED_WAIT received, handshake after delay [node = "
                                    + node + ", outOfTopologyDelay = " + DFLT_NEED_WAIT_DELAY + "ms]");

                            U.sleep(DFLT_NEED_WAIT_DELAY);

                            continue;
                        }
                        else if (rcvCnt < 0)
                            throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
                                ", senderNode=" + node + ']');

                        recoveryDesc.onHandshake(rcvCnt);

                        meta.put(CONSISTENT_ID_META, node.consistentId());
                        meta.put(CONN_IDX_META, connKey);
                        meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);

                        ses = nioSrv.createSession(ch, meta, false, null).get();
                    }
                    finally {
                        if (ses == null) {
                            U.closeQuiet(ch);

                            if (recoveryDesc != null)
                                recoveryDesc.release();
                        }
                    }
                }
                catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
                    if (ses != null) {
                        ses.close();

                        ses = null;
                    }

                    eRegistrySupplier.get().onException(
                        "Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
                        ", addr=" + addr + ']', e);

                    if (log.isDebugEnabled())
                        log.debug("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
                            ", addr=" + addr + ", err=" + e + ']'
                        );

                    if (connTimeoutStgy.checkTimeout()) {
                        U.warn(log, "Handshake timed out (will stop attempts to perform the handshake) " +
                            "[node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy +
                            ", err=" + e.getMessage() + ", addr=" + addr +
                            ", failureDetectionTimeoutEnabled=" + cfg.failureDetectionTimeoutEnabled() +
                            ", timeout=" + timeout + ']');

                        String msg = "Failed to connect to node (is node still alive?). " +
                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                            "in order to prevent parties from waiting forever in case of network issues " +
                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }
                }
                catch (ClusterTopologyCheckedException e) {
                    throw e;
                }
                catch (Exception e) {
                    // Most probably IO error on socket connect or handshake.
                    if (ses != null) {
                        ses.close();

                        ses = null;
                    }

                    eRegistrySupplier.get().onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);

                    if (log.isDebugEnabled())
                        log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');

                    if (X.hasCause(e, "Too many open files", SocketException.class))
                        throw new IgniteTooManyOpenFilesException(e);

                    // check if timeout occured in case of unrecoverable exception
                    if (connTimeoutStgy.checkTimeout()) {
                        U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
                            "[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy +
                            ", failureDetectionTimeoutEnabled=" + cfg.failureDetectionTimeoutEnabled() +
                            ", timeout=" + timeout +
                            ", err=" + e.getMessage() + ", addr=" + addr + ']');

                        String msg = "Failed to connect to node (is node still alive?). " +
                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                            "in order to prevent parties from waiting forever in case of network issues " +
                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }

                    // Inverse communication protocol works only for client nodes.
                    if (node.isClient() && isNodeUnreachableException(e))
                        failedAddrsSet.add(addr);

                    if (isRecoverableException(e))
                        U.sleep(DFLT_RECONNECT_DELAY);
                    else {
                        String msg = "Failed to connect to node due to unrecoverable exception (is node still alive?). " +
                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                            "in order to prevent parties from waiting forever in case of network issues " +
                            "[nodeId=" + node.id() + ", addrs=" + addrs + ", err= " + e + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }
                }
                finally {
                    connectGate.leave();
                }

                CommunicationWorker commWorker0 = commWorker;

                if (commWorker0 != null && commWorker0.runner() == Thread.currentThread())
                    commWorker0.updateHeartbeat();
            }

            if (ses != null)
                break;
        }

        if (ses == null) {
            // If local node and remote node are configured to use paired connections we won't even request
            // inverse connection so no point in throwing NodeUnreachableException
            if (!cfg.usePairedConnections() || !Boolean.TRUE.equals(node.attribute(attrs.pairedConnection()))) {
                if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
                    if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
                        String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
                            "; inverse connection will be requested.";

                        throw new NodeUnreachableException(msg);
                    }
                }
            }

            processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
        }

        return ses;
    }