in modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java [3412:3988]
private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
assert ring.hasRemoteNodes();
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
msgLsnr.apply(msg);
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);
sendMessageToClients(msg);
List<TcpDiscoveryNode> failedNodes;
TcpDiscoverySpiState state;
synchronized (mux) {
failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
state = spiState;
}
Collection<Throwable> errs = null;
boolean sent = false;
boolean newNextNode = false;
// Used only if spi.getEffectiveConnectionRecoveryTimeout > 0
CrossRingMessageSendState sndState = null;
UUID locNodeId = getLocalNodeId();
ringLoop: while (true) {
TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
if (newNext == null) {
if (log.isDebugEnabled())
log.debug("No next node in topology.");
if (debugMode)
debugLog(msg, "No next node in topology.");
if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
!(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
msg.senderNodeId(locNodeId);
addMessage(msg, true);
}
break;
}
if (!newNext.equals(next)) {
if (log.isDebugEnabled()) {
log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
", ring=" + ring + ", failedNodes=" + failedNodes + ']');
}
else if (log.isInfoEnabled())
log.info("New next node [newNext=" + newNext + ']');
if (debugMode)
debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next +
", ring=" + ring + ", failedNodes=" + failedNodes + ']');
U.closeQuiet(sock);
sock = null;
newNextNode(newNext);
newNextNode = true;
}
else if (log.isTraceEnabled())
log.trace("Next node remains the same [nextId=" + next.id() +
", nextOrder=" + next.internalOrder() + ']');
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
addr: for (InetSocketAddress addr : nextAddrs) {
long ackTimeout0 = spi.getAckTimeout();
if (locNodeAddrs.contains(addr)) {
if (log.isDebugEnabled()) {
log.debug("Skip to send message to the local node (probably remote node has the same " +
"loopback address that local node): " + addr);
}
continue;
}
int reconCnt = 0;
IgniteSpiOperationTimeoutHelper timeoutHelper = null;
while (true) {
if (sock == null) {
// We re-create the helper here because it could be created earlier with wrong timeout on
// message sending like IgniteConfiguration.failureDetectionTimeout. Here we are in the
// state of conenction recovering and have to work with
// TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
if (timeoutHelper == null || sndState != null)
timeoutHelper = serverOperationTimeoutHelper(sndState, -1);
boolean success = false;
boolean openSock = false;
// Restore ring.
try {
sock = spi.openSocket(addr, timeoutHelper);
out = spi.socketStream(sock);
openSock = true;
// Handshake.
TcpDiscoveryHandshakeRequest hndMsg = new TcpDiscoveryHandshakeRequest(locNodeId);
// Topology treated as changes if next node is not available.
boolean changeTop = sndState != null && !sndState.isStartingPoint();
if (changeTop)
hndMsg.changeTopology(ring.previousNodeOf(next).id());
if (log.isDebugEnabled()) {
log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
"] with timeout " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
spi.writeToSocket(sock, out, hndMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
if (log.isDebugEnabled()) {
log.debug("Reading handshake response with timeout " +
timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Handshake response: " + res);
// We should take previousNodeAlive flag into account
// only if we received the response from the correct node.
if (res.creatorNodeId().equals(next.id()) && res.previousNodeAlive() && sndState != null) {
sndState.checkTimeout();
// Remote node checked connection to it's previous and got success.
boolean previousNode = sndState.markLastFailedNodeAlive();
if (previousNode)
failedNodes.remove(failedNodes.size() - 1);
else {
newNextNode = false;
newNextNode(ring.nextNode(failedNodes));
}
U.closeQuiet(sock);
sock = null;
if (sndState.isFailed()) {
segmentLocalNodeOnSendFail(failedNodes);
return; // Nothing to do here.
}
if (previousNode)
U.warn(log, "New next node has connection to it's previous, trying previous " +
"again. [next=" + next + ']');
continue ringLoop;
}
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Handshake response from local node: " + res);
U.closeQuiet(sock);
sock = null;
break;
}
UUID nextId = res.creatorNodeId();
long nextOrder = res.order();
if (!next.id().equals(nextId)) {
// Node with different ID has bounded to the same port.
if (log.isDebugEnabled()) {
log.debug("Failed to restore ring because next node ID received is not as " +
"expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
}
if (debugMode) {
debugLog(msg, "Failed to restore ring because next node ID received is not " +
"as expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
}
break;
}
else {
// ID is as expected. Check node order.
if (nextOrder != next.internalOrder()) {
// Is next currently being added?
boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage &&
((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId));
if (!nextNew)
nextNew = hasPendingAddMessage(nextId);
if (!nextNew) {
if (log.isDebugEnabled()) {
log.debug("Failed to restore ring because next node order received " +
"is not as expected [expected=" + next.internalOrder() +
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
}
if (debugMode) {
debugLog(msg, "Failed to restore ring because next node order " +
"received is not as expected [expected=" + next.internalOrder() +
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
}
break;
}
}
updateLastSentMessageTime();
if (log.isDebugEnabled())
log.debug("Initialized connection with next node: " + next.id());
if (debugMode)
debugLog(msg, "Initialized connection with next node: " + next.id());
errs = null;
success = true;
next.lastSuccessfulAddress(addr);
}
}
catch (IOException | IgniteCheckedException e) {
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (log.isDebugEnabled())
U.error(log, "Failed to connect to next node [msg=" + msg
+ ", err=" + e.getMessage() + ']', e);
onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
// Fastens failure detection.
if (sndState != null && sndState.checkTimeout()) {
segmentLocalNodeOnSendFail(failedNodes);
return; // Nothing to do here.
}
if (!openSock)
break; // Don't retry if we can not establish connection.
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
break;
if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
continue;
}
finally {
if (!success) {
if (log.isDebugEnabled())
log.debug("Closing socket to next: " + next);
U.closeQuiet(sock);
sock = null;
}
else {
// Resetting timeout control object to let the code below to use a new one
// for the next bunch of operations.
timeoutHelper = null;
}
}
}
try {
boolean failure;
synchronized (mux) {
failure = ServerImpl.this.failedNodes.size() < failedNodes.size();
}
assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
if (failure || forceSndPending || newNextNode) {
if (log.isDebugEnabled()) {
log.debug("Pending messages will be sent [failure=" + failure +
", newNextNode=" + newNextNode +
", forceSndPending=" + forceSndPending +
", failedNodes=" + failedNodes + ']');
}
if (debugMode) {
debugLog(msg, "Pending messages will be sent [failure=" + failure +
", newNextNode=" + newNextNode +
", forceSndPending=" + forceSndPending +
", failedNodes=" + failedNodes + ']');
}
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tsNanos = System.nanoTime();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.customDiscardId);
addFailedNodes(pendingMsg, failedNodes);
if (timeoutHelper == null)
timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
try {
spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
spi.getSocketTimeout()));
}
finally {
clearNodeAddedMessage(pendingMsg);
}
long tsNanos0 = System.nanoTime();
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
updateLastSentMessageTime();
spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos));
if (log.isDebugEnabled()) {
log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
}
if (debugMode) {
debugLog(msg, "Pending message has been sent to next node [msgId=" + msg.id() +
", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
}
// Resetting timeout control object to create a new one for the next bunch of
// operations.
timeoutHelper = null;
}
}
if (!(msg instanceof TcpDiscoveryConnectionCheckMessage))
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs,
pendingMsgs.customDiscardId);
try {
SecurityUtils.serializeVersion(1);
long tsNanos = System.nanoTime();
if (timeoutHelper == null)
timeoutHelper = serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
addFailedNodes(msg, failedNodes);
boolean latencyCheck = msg instanceof TcpDiscoveryRingLatencyCheckMessage;
if (latencyCheck && log.isInfoEnabled())
log.info("Latency check message has been written to socket: " + msg.id());
spi.writeToSocket(newNextNode ? newNext : next,
sock,
out,
msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tsNanos0 = System.nanoTime();
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
updateLastSentMessageTime();
if (latencyCheck && log.isInfoEnabled())
log.info("Latency check message has been acked: " + msg.id());
spi.stats.onMessageSent(msg, U.nanosToMillis(tsNanos0 - tsNanos));
onMessageExchanged();
DebugLogger debugLog = messageLogger(msg);
if (debugLog.isDebugEnabled()) {
debugLog.debug("Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
}
if (debugMode) {
debugLog(msg, "Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
}
}
finally {
SecurityUtils.restoreDefaultSerializeVersion();
clearNodeAddedMessage(msg);
}
registerPendingMessage(msg);
sent = true;
break addr;
}
catch (IOException | IgniteCheckedException e) {
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (log.isDebugEnabled())
U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg +
", err=" + e + ']', e);
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled()) {
if (++reconCnt == spi.getReconnectCount())
break;
else if (e instanceof SocketTimeoutException ||
X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
}
}
finally {
forceSndPending = false;
if (!sent) {
if (log.isDebugEnabled())
log.debug("Closing socket to next (not sent): " + next);
U.closeQuiet(sock);
sock = null;
if (log.isDebugEnabled()) {
log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
(!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']');
}
}
}
} // Try to reconnect.
} // Iterating node's addresses.
if (!sent) {
if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
sndState = new CrossRingMessageSendState();
else if (sndState != null && sndState.checkTimeout()) {
segmentLocalNodeOnSendFail(failedNodes);
return; // Nothing to do here.
}
boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
if (failedNextNode && !failedNodes.contains(next)) {
failedNodes.add(next);
if (state == CONNECTED) {
Exception err = errs != null ?
U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg +
", next=" + U.toShortString(next) + ']', errs) :
null;
// If node existed on connection initialization we should check
// whether it has not gone yet.
U.warn(log, "Failed to send message to next node [msg=" + msg + ", next=" + next +
", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
}
}
else if (!failedNextNode && sndState != null && sndState.isBackward()) {
boolean prev = sndState.markLastFailedNodeAlive();
U.warn(log, "Failed to send message to next node, try previous [msg=" + msg +
", next=" + next + ']');
if (prev)
failedNodes.remove(failedNodes.size() - 1);
else {
newNextNode = false;
newNextNode(ring.nextNode(failedNodes));
}
}
newNextNode(null);
errs = null;
}
else
break;
}
synchronized (mux) {
failedNodes.removeAll(ServerImpl.this.failedNodes.keySet());
}
if (!failedNodes.isEmpty()) {
if (state == CONNECTED) {
if (!sent && log.isDebugEnabled())
// Message has not been sent due to some problems.
log.debug("Message has not been sent: " + msg);
if (log.isDebugEnabled())
log.debug("Detected failed nodes: " + failedNodes);
}
synchronized (mux) {
for (TcpDiscoveryNode failedNode : failedNodes) {
if (!ServerImpl.this.failedNodes.containsKey(failedNode))
ServerImpl.this.failedNodes.put(failedNode, locNodeId);
}
for (TcpDiscoveryNode failedNode : failedNodes)
failedNodesMsgSent.add(failedNode.id());
}
for (TcpDiscoveryNode n : failedNodes)
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
if (!sent) {
assert next == null : next;
if (log.isDebugEnabled())
log.debug("Pending messages will be resent to local node");
if (debugMode)
debugLog(msg, "Pending messages will be resent to local node");
processPendingMessagesLocally(msg);
}
LT.warn(log, "Local node has detected failed nodes and started cluster-wide procedure. " +
"To speed up failure detection please see 'Failure Detection' section under javadoc" +
" for 'TcpDiscoverySpi'");
}
}