private void onFirstMessage()

in modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java [465:655]


    private void onFirstMessage(final GridNioSession ses, Message msg) {
        UUID sndId;

        ConnectionKey connKey;

        if (msg instanceof NodeIdMessage) {
            sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
            connKey = new ConnectionKey(sndId, 0, -1);
        }
        else {
            assert msg instanceof HandshakeMessage : msg;

            HandshakeMessage msg0 = (HandshakeMessage)msg;

            sndId = ((HandshakeMessage)msg).nodeId();
            connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
        }

        if (log.isDebugEnabled())
            log.debug("Remote node ID received: " + sndId);

        final ClusterNode rmtNode = nodeGetter.apply(sndId);

        if (rmtNode == null) {
            DiscoverySpi discoverySpi = igniteExSupplier.get().configuration().getDiscoverySpi();

            boolean unknownNode = true;

            if (discoverySpi instanceof TcpDiscoverySpi) {
                TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi;

                ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);

                if (node0 != null) {
                    assert node0.isClient() : node0;

                    unknownNode = false;
                }
            }
            else if (discoverySpi instanceof IgniteDiscoverySpi)
                unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId);

            if (unknownNode) {
                U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');

                ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(fut -> ses.close());
            }
            else
                ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(fut -> ses.close());

            return;
        }

        ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());

        final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);

        assert old == null;

        ClusterNode locNode = locNodeSupplier.get();

        if (ses.remoteAddress() == null)
            return;

        assert msg instanceof HandshakeMessage : msg;

        HandshakeMessage msg0 = (HandshakeMessage)msg;

        if (log.isDebugEnabled()) {
            log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId +
                ", msg=" + msg0 + ']');
        }

        if (GridNioServerWrapper.isChannelConnIdx(msg0.connectionIndex()))
            ses.send(new RecoveryLastReceivedMessage(0));
        else if (cfg.usePairedConnections() && usePairedConnections(rmtNode, attributeNames.pairedConnection())) {
            final GridNioRecoveryDescriptor recoveryDesc = nioSrvWrapper.inRecoveryDescriptor(rmtNode, connKey);

            ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);

            boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);

            if (reserve)
                connectedNew(recoveryDesc, ses, true);
            else {
                if (c.failed) {
                    ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                    closeStaleConnections(connKey);
                }
            }
        }
        else {
            assert connKey.connectionIndex() >= 0 : connKey;

            GridCommunicationClient[] curClients = clientPool.clientFor(sndId);

            GridCommunicationClient oldClient =
                curClients != null && connKey.connectionIndex() < curClients.length ?
                    curClients[connKey.connectionIndex()] :
                    null;

            if (oldClient instanceof GridTcpNioCommunicationClient) {
                if (log.isInfoEnabled())
                    log.info("Received incoming connection when already connected " +
                        "to this node, rejecting [locNode=" + locNode.id() +
                        ", rmtNode=" + sndId + ']');

                ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                closeStaleConnections(connKey);

                return;
            }

            GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();

            GridFutureAdapter<GridCommunicationClient> oldFut = clientPool.putIfAbsentFut(connKey, fut);

            final GridNioRecoveryDescriptor recoveryDesc = nioSrvWrapper.inRecoveryDescriptor(rmtNode, connKey);

            if (oldFut == null) {
                curClients = clientPool.clientFor(sndId);

                oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
                    curClients[connKey.connectionIndex()] : null;

                if (oldClient instanceof GridTcpNioCommunicationClient) {
                    assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient;

                    if (log.isInfoEnabled())
                        log.info("Received incoming connection when already connected " +
                            "to this node, rejecting [locNode=" + locNode.id() +
                            ", rmtNode=" + sndId + ']');

                    ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                    closeStaleConnections(connKey);

                    fut.onDone(oldClient);

                    return;
                }

                boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                    new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, true, fut));

                if (log.isDebugEnabled()) {
                    log.debug("Received incoming connection from remote node " +
                        "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved +
                        ", recovery=" + recoveryDesc + ']');
                }

                if (reserved) {
                    try {
                        GridTcpNioCommunicationClient client =
                            connected(recoveryDesc, ses, rmtNode, msg0.received(), true, true);

                        fut.onDone(client);
                    }
                    finally {
                        clientPool.removeFut(connKey, fut);
                    }
                }
            }
            else {
                if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
                    if (log.isInfoEnabled()) {
                        log.info("Received incoming connection from remote node while " +
                            "connecting to this node, rejecting [locNode=" + locNode.id() +
                            ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
                            ", rmtNodeOrder=" + rmtNode.order() + ']');
                    }

                    ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
                }
                else {
                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                        new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, true, fut));

                    GridTcpNioCommunicationClient client = null;

                    if (reserved)
                        client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, true);

                    if (oldFut instanceof ConnectionRequestFuture && !oldFut.isDone())
                        oldFut.onDone(client);
                }
            }
        }
    }