in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [967:1154]
static Connection createSender(final Membership<InternalDistributedMember> mgr,
final ConnectionTable t,
final boolean preserveOrder, final InternalDistributedMember remoteAddr,
final boolean sharedResource,
final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry)
throws IOException, DistributedSystemDisconnectedException {
boolean success = false;
Connection conn = null;
// keep trying. Note that this may be executing during the shutdown window
// where a cancel criterion has not been established, but threads are being
// interrupted. In this case we must allow the connection to succeed even
// though subsequent messaging using the socket may fail
boolean interrupted = Thread.interrupted();
try {
boolean connectionErrorLogged = false;
long reconnectWaitTime = RECONNECT_WAIT_TIME;
boolean suspected = false;
boolean severeAlertIssued = false;
boolean firstTime = true;
boolean warningPrinted = false;
while (!success) { // keep trying
// Quit if DM has stopped distribution
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
long now = System.currentTimeMillis();
if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
if (startTime + ackTimeout + ackSATimeout < now) {
if (remoteAddr != null) {
logger.fatal("Unable to form a TCP/IP connection to {} in over {} seconds",
remoteAddr, (ackSATimeout + ackTimeout) / 1000);
}
severeAlertIssued = true;
} else if (!suspected) {
if (remoteAddr != null) {
logger.warn("Unable to form a TCP/IP connection to {} in over {} seconds",
remoteAddr, ackTimeout / 1000);
}
mgr.suspectMember(remoteAddr,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
reconnectWaitTime =
Math.min(RECONNECT_WAIT_TIME, ackSATimeout - (now - startTime - ackTimeout));
if (reconnectWaitTime <= 0) {
reconnectWaitTime = RECONNECT_WAIT_TIME;
}
} else if (!suspected && startTime > 0 && ackTimeout > 0
&& startTime + ackTimeout < now) {
mgr.suspectMember(remoteAddr,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
if (firstTime) {
firstTime = false;
if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr)
|| mgr.shutdownInProgress()) {
throw new IOException("Member " + remoteAddr + " left the system");
}
} else {
// if we're sending an alert and can't connect, bail out. A sick
// alert listener should not prevent cache operations from continuing
if (AlertingAction.isThreadAlerting()) {
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + remoteAddr);
}
if (doNotRetry) {
throw new IOException("Connection not created in first try to " + remoteAddr);
}
// Wait briefly...
interrupted = Thread.interrupted() || interrupted;
try {
Thread.sleep(reconnectWaitTime);
} catch (InterruptedException ie) {
interrupted = true;
t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
if (giveUpOnMember(mgr, remoteAddr)) {
throw new IOException(
format("Member %s left the group", remoteAddr));
}
if (!warningPrinted) {
warningPrinted = true;
logger.warn("Connection: Attempting reconnect to peer {}",
remoteAddr);
}
t.getConduit().getStats().incReconnectAttempts();
}
// create connection
try {
conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
} catch (SSLHandshakeException se) {
// no need to retry if certificates were rejected
throw se;
} catch (IOException ioe) {
// Only give up if the member leaves the view.
if (giveUpOnMember(mgr, remoteAddr)) {
throw ioe;
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
if ("Too many open files".equals(ioe.getMessage())) {
t.fileDescriptorsExhausted();
} else if (!connectionErrorLogged) {
connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of
// these
logger.info("Connection: shared={} ordered={} failed to connect to peer {} because: {}",
sharedResource, preserveOrder, remoteAddr,
ioe.getCause() != null ? ioe.getCause() : ioe);
}
} // IOException
finally {
if (conn == null) {
t.getConduit().getStats().incFailedConnect();
}
}
if (conn != null) {
// handshake
try {
conn.attemptHandshake(t);
if (conn.isSocketClosed()) {
// something went wrong while reading the handshake
// and the socket was closed or we were sent
// ShutdownMessage
if (giveUpOnMember(mgr, remoteAddr)) {
throw new IOException(format("Member %s left the group", remoteAddr));
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
// no success but no need to log; just retry
} else {
success = true;
}
} catch (ConnectionException e) {
if (giveUpOnMember(mgr, remoteAddr)) {
throw new IOException("Handshake failed", e);
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
logger.info(
"Connection: shared={} ordered={} handshake failed to connect to peer {} because: {}",
sharedResource, preserveOrder, remoteAddr, e);
} catch (IOException e) {
if (giveUpOnMember(mgr, remoteAddr)) {
throw e;
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
logger.info(
"Connection: shared={} ordered={} handshake failed to connect to peer {} because: {}",
sharedResource, preserveOrder, remoteAddr, e);
if (!sharedResource && "Too many open files".equals(e.getMessage())) {
t.fileDescriptorsExhausted();
}
} finally {
if (!success) {
try {
conn.requestClose("failed handshake");
} catch (Exception ignore) {
}
conn = null;
}
}
}
} // while
if (warningPrinted) {
logger.info("{}: Successfully reestablished connection to peer {}",
mgr.getLocalMember(), remoteAddr);
}
} finally {
try {
if (!success) {
if (conn != null) {
conn.requestClose("failed construction");
conn = null;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
if (conn == null) {
throw new ConnectionException(
format("Connection: failed construction for peer %s", remoteAddr));
}
if (preserveOrder && BATCH_SENDS) {
conn.createBatchSendBuffer();
}
conn.finishedConnecting = true;
return conn;
}