private void connect()

in core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java [352:458]


    private void connect(
        Queue<Node> nodes,
        List<Entry<Node, Throwable>> errors,
        Runnable onSuccess,
        Consumer<Throwable> onFailure) {
      assert adminExecutor.inEventLoop();
      Node node = nodes.poll();
      if (node == null) {
        onFailure.accept(AllNodesFailedException.fromErrors(errors));
      } else {
        LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node);
        context
            .getChannelFactory()
            .connect(node, channelOptions)
            .whenCompleteAsync(
                (channel, error) -> {
                  try {
                    NodeDistance lastDistance = lastNodeDistance.get(node);
                    NodeState lastState = lastNodeState.get(node);
                    if (error != null) {
                      if (closeWasCalled || initFuture.isCancelled()) {
                        onSuccess.run(); // abort, we don't really care about the result
                      } else {
                        if (error instanceof AuthenticationException) {
                          Loggers.warnWithException(
                              LOG, "[{}] Authentication error", logPrefix, error);
                        } else {
                          if (config
                              .getDefaultProfile()
                              .getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) {
                            Loggers.warnWithException(
                                LOG,
                                "[{}] Error connecting to {}, trying next node",
                                logPrefix,
                                node,
                                error);
                          } else {
                            LOG.debug(
                                "[{}] Error connecting to {}, trying next node",
                                logPrefix,
                                node,
                                error);
                          }
                        }
                        List<Entry<Node, Throwable>> newErrors =
                            (errors == null) ? new ArrayList<>() : errors;
                        newErrors.add(new SimpleEntry<>(node, error));
                        context.getEventBus().fire(ChannelEvent.controlConnectionFailed(node));
                        connect(nodes, newErrors, onSuccess, onFailure);
                      }
                    } else if (closeWasCalled || initFuture.isCancelled()) {
                      LOG.debug(
                          "[{}] New channel opened ({}) but the control connection was closed, closing it",
                          logPrefix,
                          channel);
                      channel.forceClose();
                      onSuccess.run();
                    } else if (lastDistance == NodeDistance.IGNORED) {
                      LOG.debug(
                          "[{}] New channel opened ({}) but node became ignored, "
                              + "closing and trying next node",
                          logPrefix,
                          channel);
                      channel.forceClose();
                      connect(nodes, errors, onSuccess, onFailure);
                    } else if (lastNodeState.containsKey(node)
                        && (lastState == null /*(removed)*/
                            || lastState == NodeState.FORCED_DOWN)) {
                      LOG.debug(
                          "[{}] New channel opened ({}) but node was removed or forced down, "
                              + "closing and trying next node",
                          logPrefix,
                          channel);
                      channel.forceClose();
                      connect(nodes, errors, onSuccess, onFailure);
                    } else {
                      LOG.debug("[{}] New channel opened {}", logPrefix, channel);
                      DriverChannel previousChannel = ControlConnection.this.channel;
                      ControlConnection.this.channel = channel;
                      if (previousChannel != null) {
                        // We were reconnecting: make sure previous channel gets closed (it may
                        // still be open if reconnection was forced)
                        LOG.debug(
                            "[{}] Forcefully closing previous channel {}", logPrefix, channel);
                        previousChannel.forceClose();
                      }
                      context.getEventBus().fire(ChannelEvent.channelOpened(node));
                      channel
                          .closeFuture()
                          .addListener(
                              f ->
                                  adminExecutor
                                      .submit(() -> onChannelClosed(channel, node))
                                      .addListener(UncaughtExceptions::log));
                      onSuccess.run();
                    }
                  } catch (Exception e) {
                    Loggers.warnWithException(
                        LOG,
                        "[{}] Unexpected exception while processing channel init result",
                        logPrefix,
                        e);
                  }
                },
                adminExecutor);
      }
    }