Future initiate()

in src/java/org/apache/cassandra/net/OutboundConnection.java [1081:1258]


    Future<?> initiate()
    {
        class Initiate
        {
            /**
             * If we fail to connect, we want to try and connect again before any messages timeout.
             * However, we update this each time to ensure we do not retry unreasonably often, and settle on a periodicity
             * that might lead to timeouts in some aggressive systems.
             */
            long retryRateMillis = DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2;

            // our connection settings, possibly updated on retry
            int messagingVersion = template.endpointToVersion().get(template.to);
            OutboundConnectionSettings settings;

            /**
             * If we failed for any reason, try again
             */
            void onFailure(Throwable cause)
            {
                if (cause instanceof ConnectException)
                    noSpamLogger.info("{} failed to connect", id(), cause);
                else
                    noSpamLogger.error("{} failed to connect", id(), cause);

                JVMStabilityInspector.inspectThrowable(cause);

                if (hasPending())
                {
                    boolean isSSLFailure = isSSLError(cause);
                    Promise<Result<MessagingSuccess>> result = AsyncPromise.withExecutor(eventLoop);
                    state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result, isSSLFailure), max(100, retryRateMillis), MILLISECONDS));
                    retryRateMillis = min(1000, retryRateMillis * 2);
                }
                else
                {
                    // this Initiate will be discarded
                    state = Disconnected.dormant(state.disconnected().maintenance);
                }
            }

            void onCompletedHandshake(Result<MessagingSuccess> result)
            {
                switch (result.outcome)
                {
                    case SUCCESS:
                        // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections
                        assert !state.isClosed();

                        MessagingSuccess success = result.success();
                        debug.onConnect(success.messagingVersion, settings);
                        state.disconnected().maintenance.cancel(false);

                        FrameEncoder.PayloadAllocator payloadAllocator = success.allocator;
                        Channel channel = success.channel;
                        Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings);
                        state = established;
                        channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelInactive(ChannelHandlerContext ctx)
                            {
                                disconnectNow(established);
                                ctx.fireChannelInactive();
                            }

                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                            {
                                try
                                {
                                    invalidateChannel(established, cause);
                                }
                                catch (Throwable t)
                                {
                                    logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
                                }
                            }
                        });
                        ++successfulConnections;

                        logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}",
                                    id(true),
                                    success.messagingVersion,
                                    settings.framing,
                                    encryptionConnectionSummary(channel));
                        break;

                    case RETRY:
                        if (logger.isTraceEnabled())
                            logger.trace("{} incorrect legacy peer version predicted; reconnecting", id());

                        // the messaging version we connected with was incorrect; try again with the one supplied by the remote host
                        messagingVersion = result.retry().withMessagingVersion;
                        settings.endpointToVersion.set(settings.to, messagingVersion);

                        initiate();
                        break;

                    case INCOMPATIBLE:
                        // we cannot communicate with this peer given its messaging version; mark this as any other failure, and continue trying
                        Throwable t = new IOException(String.format("Incompatible peer: %s, messaging version: %s",
                                                                    settings.to, result.incompatible().maxMessagingVersion));
                        t.fillInStackTrace();
                        onFailure(t);
                        break;

                    default:
                        throw new AssertionError();
                }
            }

            /**
             * Initiate all the actions required to establish a working, valid connection. This includes
             * opening the socket, negotiating the internode messaging handshake, and setting up the working
             * Netty {@link Channel}. However, this method will not block for all those actions: it will only
             * kick off the connection attempt, setting the @{link #connecting} future to track its completion.
             *
             * Note: this should only be invoked on the event loop.
             */
            private void attempt(Promise<Result<MessagingSuccess>> result, boolean sslFallbackEnabled)
            {
                ++connectionAttempts;

                /*
                 * Re-evaluate messagingVersion before re-attempting the connection in case
                 * endpointToVersion were updated. This happens if the outbound connection
                 * is made before the endpointToVersion table is initially constructed or out
                 * of date (e.g. if outbound connections are established for gossip
                 * as a result of an inbound connection) and can result in the wrong outbound
                 * port being selected if configured with legacy_ssl_storage_port_enabled=true.
                 */
                int knownMessagingVersion = messagingVersion();
                if (knownMessagingVersion != messagingVersion)
                {
                    logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.",
                                 messagingVersion, knownMessagingVersion);
                    messagingVersion = knownMessagingVersion;
                }

                settings = template;
                if (messagingVersion > settings.acceptVersions.max)
                    messagingVersion = settings.acceptVersions.max;

                // In mixed mode operation, some nodes might be configured to use SSL for internode connections and
                // others might be configured to not use SSL. When a node is configured in optional SSL mode, It should
                // be able to handle SSL and Non-SSL internode connections. We take care of this when accepting NON-SSL
                // connection in Inbound connection by having optional SSL handler for inbound connections.
                // For outbound connections, if the authentication fails, we should fall back to other SSL strategies
                // while talking to older nodes in the cluster which are configured to make NON-SSL connections
                SslFallbackConnectionType[] fallBackSslFallbackConnectionTypes = SslFallbackConnectionType.values();
                int index = sslFallbackEnabled && settings.withEncryption() && settings.encryption.getOptional() ?
                            (int) (connectionAttempts - 1) % fallBackSslFallbackConnectionTypes.length : 0;
                if (fallBackSslFallbackConnectionTypes[index] != SslFallbackConnectionType.SERVER_CONFIG)
                {
                    logger.info("ConnectionId {} is falling back to {} reconnect strategy for retry", id(), fallBackSslFallbackConnectionTypes[index]);
                }
                initiateMessaging(eventLoop, type, fallBackSslFallbackConnectionTypes[index], settings, result)
                .addListener(future -> {
                    if (future.isCancelled())
                        return;
                    if (future.isSuccess()) //noinspection unchecked
                        onCompletedHandshake((Result<MessagingSuccess>) future.getNow());
                    else
                        onFailure(future.cause());
                });
            }

            Future<Result<MessagingSuccess>> initiate()
            {
                Promise<Result<MessagingSuccess>> result = AsyncPromise.withExecutor(eventLoop);
                state = new Connecting(state.disconnected(), result);
                attempt(result, false);
                return result;
            }
        }

        return new Initiate().initiate();
    }