in src/java/org/apache/cassandra/net/OutboundConnection.java [1081:1258]
Future<?> initiate()
{
class Initiate
{
/**
* If we fail to connect, we want to try and connect again before any messages timeout.
* However, we update this each time to ensure we do not retry unreasonably often, and settle on a periodicity
* that might lead to timeouts in some aggressive systems.
*/
long retryRateMillis = DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2;
// our connection settings, possibly updated on retry
int messagingVersion = template.endpointToVersion().get(template.to);
OutboundConnectionSettings settings;
/**
* If we failed for any reason, try again
*/
void onFailure(Throwable cause)
{
if (cause instanceof ConnectException)
noSpamLogger.info("{} failed to connect", id(), cause);
else
noSpamLogger.error("{} failed to connect", id(), cause);
JVMStabilityInspector.inspectThrowable(cause);
if (hasPending())
{
boolean isSSLFailure = isSSLError(cause);
Promise<Result<MessagingSuccess>> result = AsyncPromise.withExecutor(eventLoop);
state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result, isSSLFailure), max(100, retryRateMillis), MILLISECONDS));
retryRateMillis = min(1000, retryRateMillis * 2);
}
else
{
// this Initiate will be discarded
state = Disconnected.dormant(state.disconnected().maintenance);
}
}
void onCompletedHandshake(Result<MessagingSuccess> result)
{
switch (result.outcome)
{
case SUCCESS:
// it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections
assert !state.isClosed();
MessagingSuccess success = result.success();
debug.onConnect(success.messagingVersion, settings);
state.disconnected().maintenance.cancel(false);
FrameEncoder.PayloadAllocator payloadAllocator = success.allocator;
Channel channel = success.channel;
Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings);
state = established;
channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
disconnectNow(established);
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
try
{
invalidateChannel(established, cause);
}
catch (Throwable t)
{
logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
}
}
});
++successfulConnections;
logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}",
id(true),
success.messagingVersion,
settings.framing,
encryptionConnectionSummary(channel));
break;
case RETRY:
if (logger.isTraceEnabled())
logger.trace("{} incorrect legacy peer version predicted; reconnecting", id());
// the messaging version we connected with was incorrect; try again with the one supplied by the remote host
messagingVersion = result.retry().withMessagingVersion;
settings.endpointToVersion.set(settings.to, messagingVersion);
initiate();
break;
case INCOMPATIBLE:
// we cannot communicate with this peer given its messaging version; mark this as any other failure, and continue trying
Throwable t = new IOException(String.format("Incompatible peer: %s, messaging version: %s",
settings.to, result.incompatible().maxMessagingVersion));
t.fillInStackTrace();
onFailure(t);
break;
default:
throw new AssertionError();
}
}
/**
* Initiate all the actions required to establish a working, valid connection. This includes
* opening the socket, negotiating the internode messaging handshake, and setting up the working
* Netty {@link Channel}. However, this method will not block for all those actions: it will only
* kick off the connection attempt, setting the @{link #connecting} future to track its completion.
*
* Note: this should only be invoked on the event loop.
*/
private void attempt(Promise<Result<MessagingSuccess>> result, boolean sslFallbackEnabled)
{
++connectionAttempts;
/*
* Re-evaluate messagingVersion before re-attempting the connection in case
* endpointToVersion were updated. This happens if the outbound connection
* is made before the endpointToVersion table is initially constructed or out
* of date (e.g. if outbound connections are established for gossip
* as a result of an inbound connection) and can result in the wrong outbound
* port being selected if configured with legacy_ssl_storage_port_enabled=true.
*/
int knownMessagingVersion = messagingVersion();
if (knownMessagingVersion != messagingVersion)
{
logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.",
messagingVersion, knownMessagingVersion);
messagingVersion = knownMessagingVersion;
}
settings = template;
if (messagingVersion > settings.acceptVersions.max)
messagingVersion = settings.acceptVersions.max;
// In mixed mode operation, some nodes might be configured to use SSL for internode connections and
// others might be configured to not use SSL. When a node is configured in optional SSL mode, It should
// be able to handle SSL and Non-SSL internode connections. We take care of this when accepting NON-SSL
// connection in Inbound connection by having optional SSL handler for inbound connections.
// For outbound connections, if the authentication fails, we should fall back to other SSL strategies
// while talking to older nodes in the cluster which are configured to make NON-SSL connections
SslFallbackConnectionType[] fallBackSslFallbackConnectionTypes = SslFallbackConnectionType.values();
int index = sslFallbackEnabled && settings.withEncryption() && settings.encryption.getOptional() ?
(int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0;
if (fallBackSslFallbackConnectionTypes[index] != SslFallbackConnectionType.SERVER_CONFIG)
{
logger.info("ConnectionId {} is falling back to {} reconnect strategy for retry", id(), fallBackSslFallbackConnectionTypes[index]);
}
initiateMessaging(eventLoop, type, fallBackSslFallbackConnectionTypes[index], settings, result)
.addListener(future -> {
if (future.isCancelled())
return;
if (future.isSuccess()) //noinspection unchecked
onCompletedHandshake((Result<MessagingSuccess>) future.getNow());
else
onFailure(future.cause());
});
}
Future<Result<MessagingSuccess>> initiate()
{
Promise<Result<MessagingSuccess>> result = AsyncPromise.withExecutor(eventLoop);
state = new Connecting(state.disconnected(), result);
attempt(result, false);
return result;
}
}
return new Initiate().initiate();
}