in modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java [188:338]
public GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
|| !(cfg.usePairedConnections() && usePairedConnections(node, attrs.pairedConnection()))
|| GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong communication connection index: " + connIdx;
if (locNodeSupplier.get().isClient()) {
if (node.isClient()) {
if (DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
throw new IgniteSpiException("Cannot send message to the client node with no server socket opened.");
}
}
UUID nodeId = node.id();
if (log.isDebugEnabled())
log.debug("The node client is going to reserve a connection [nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
while (true) {
GridCommunicationClient[] curClients = clients.get(nodeId);
GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
curClients[connIdx] : null;
if (client == null) {
if (stopping)
throw new IgniteSpiException("Node is stopping.");
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
if (oldFut == null) {
try {
GridCommunicationClient[] curClients0 = clients.get(nodeId);
GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
curClients0[connIdx] : null;
if (client0 == null) {
client0 = createCommunicationClient(node, connIdx);
if (client0 != null) {
addNodeClient(node, connIdx, client0);
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
if (log.isDebugEnabled()) {
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
}
client0 = null;
}
}
}
else {
U.sleep(200);
if (nodeGetter.apply(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message " +
"(node left topology): " + node);
}
}
fut.onDone(client0);
}
catch (NodeUnreachableException e) {
log.warning(e.getMessage());
fut = handleUnreachableNodeException(node, connIdx, fut, e);
}
catch (Throwable e) {
if (e instanceof NodeUnreachableException)
throw e;
fut.onDone(e);
if (e instanceof IgniteTooManyOpenFilesException)
throw e;
if (e instanceof Error)
throw (Error)e;
}
finally {
clientFuts.remove(connKey, fut);
}
}
else
fut = oldFut;
long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3
: cfg.connectionTimeout() / 3;
long currTimeout = System.currentTimeMillis();
// This cycle will eventually quit when future is completed by concurrent thread reserving client.
while (true) {
try {
client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
currTimeout += clientReserveWaitTimeout;
if (log.isDebugEnabled()) {
log.debug(
"Still waiting for reestablishing connection to node " +
"[nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]"
);
}
if (registry != null) {
GridWorker wrkr = registry.worker(Thread.currentThread().getName());
if (wrkr != null)
wrkr.updateHeartbeat();
}
}
}
if (client == null) {
if (clusterStateProvider.isLocalNodeDisconnected())
throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
else
continue;
}
if (nodeGetter.apply(nodeId) == null) {
if (removeNodeClient(nodeId, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
}
}
assert connIdx == client.connectionIndex() : client;
if (client.reserve())
return client;
else
// Client has just been closed by idle worker. Help it and try again.
removeNodeClient(nodeId, client);
}
}