public GridCommunicationClient reserveClient()

in modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java [188:338]


    public GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        assert node != null;
        assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
            || !(cfg.usePairedConnections() && usePairedConnections(node, attrs.pairedConnection()))
            || GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong communication connection index: " + connIdx;

        if (locNodeSupplier.get().isClient()) {
            if (node.isClient()) {
                if (DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
                    throw new IgniteSpiException("Cannot send message to the client node with no server socket opened.");
            }
        }

        UUID nodeId = node.id();

        if (log.isDebugEnabled())
            log.debug("The node client is going to reserve a connection [nodeId=" + node.id() + ", connIdx=" + connIdx + "]");

        while (true) {
            GridCommunicationClient[] curClients = clients.get(nodeId);

            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
                curClients[connIdx] : null;

            if (client == null) {
                if (stopping)
                    throw new IgniteSpiException("Node is stopping.");

                // Do not allow concurrent connects.
                GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();

                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);

                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);

                if (oldFut == null) {
                    try {
                        GridCommunicationClient[] curClients0 = clients.get(nodeId);

                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
                            curClients0[connIdx] : null;

                        if (client0 == null) {
                            client0 = createCommunicationClient(node, connIdx);

                            if (client0 != null) {
                                addNodeClient(node, connIdx, client0);

                                if (client0 instanceof GridTcpNioCommunicationClient) {
                                    GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);

                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
                                        if (log.isDebugEnabled()) {
                                            log.debug("Session was closed after client creation, will retry " +
                                                "[node=" + node + ", client=" + client0 + ']');
                                        }

                                        client0 = null;
                                    }
                                }
                            }
                            else {
                                U.sleep(200);

                                if (nodeGetter.apply(node.id()) == null)
                                    throw new ClusterTopologyCheckedException("Failed to send message " +
                                        "(node left topology): " + node);
                            }
                        }

                        fut.onDone(client0);
                    }
                    catch (NodeUnreachableException e) {
                        log.warning(e.getMessage());

                        fut = handleUnreachableNodeException(node, connIdx, fut, e);
                    }
                    catch (Throwable e) {
                        if (e instanceof NodeUnreachableException)
                            throw e;

                        fut.onDone(e);

                        if (e instanceof IgniteTooManyOpenFilesException)
                            throw e;

                        if (e instanceof Error)
                            throw (Error)e;
                    }
                    finally {
                        clientFuts.remove(connKey, fut);
                    }
                }
                else
                    fut = oldFut;

                long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3
                    : cfg.connectionTimeout() / 3;

                long currTimeout = System.currentTimeMillis();

                // This cycle will eventually quit when future is completed by concurrent thread reserving client.
                while (true) {
                    try {
                        client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS);

                        break;
                    }
                    catch (IgniteFutureTimeoutCheckedException ignored) {
                        currTimeout += clientReserveWaitTimeout;

                        if (log.isDebugEnabled()) {
                            log.debug(
                                "Still waiting for reestablishing connection to node " +
                                    "[nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]"
                            );
                        }

                        if (registry != null) {
                            GridWorker wrkr = registry.worker(Thread.currentThread().getName());

                            if (wrkr != null)
                                wrkr.updateHeartbeat();
                        }
                    }
                }

                if (client == null) {
                    if (clusterStateProvider.isLocalNodeDisconnected())
                        throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
                    else
                        continue;
                }

                if (nodeGetter.apply(nodeId) == null) {
                    if (removeNodeClient(nodeId, client))
                        client.forceClose();

                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
                }
            }

            assert connIdx == client.connectionIndex() : client;

            if (client.reserve())
                return client;
            else
                // Client has just been closed by idle worker. Help it and try again.
                removeNodeClient(nodeId, client);
        }
    }