private boolean onAllConnected()

in core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java [307:404]


    private boolean onAllConnected(@SuppressWarnings("unused") Void v) {
      assert adminExecutor.inEventLoop();
      Throwable fatalError = null;
      int invalidKeyspaceErrors = 0;
      for (CompletionStage<DriverChannel> pendingChannel : pendingChannels) {
        CompletableFuture<DriverChannel> future = pendingChannel.toCompletableFuture();
        assert future.isDone();
        if (future.isCompletedExceptionally()) {
          Throwable error = CompletableFutures.getFailed(future);
          ((DefaultNode) node)
              .getMetricUpdater()
              .incrementCounter(
                  error instanceof AuthenticationException
                      ? DefaultNodeMetric.AUTHENTICATION_ERRORS
                      : DefaultNodeMetric.CONNECTION_INIT_ERRORS,
                  null);
          if (error instanceof ClusterNameMismatchException
              || error instanceof UnsupportedProtocolVersionException) {
            // This will likely be thrown by all channels, but finish the loop cleanly
            fatalError = error;
          } else if (error instanceof AuthenticationException) {
            // Always warn because this is most likely something the operator needs to fix.
            // Keep going to reconnect if it can be fixed without bouncing the client.
            Loggers.warnWithException(LOG, "[{}] Authentication error", logPrefix, error);
          } else if (error instanceof InvalidKeyspaceException) {
            invalidKeyspaceErrors += 1;
          } else {
            if (config
                .getDefaultProfile()
                .getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) {
              Loggers.warnWithException(
                  LOG, "[{}]  Error while opening new channel", logPrefix, error);
            } else {
              LOG.debug("[{}]  Error while opening new channel", logPrefix, error);
            }
          }
        } else {
          DriverChannel channel = CompletableFutures.getCompleted(future);
          if (isClosing) {
            LOG.debug(
                "[{}] New channel added ({}) but the pool was closed, closing it",
                logPrefix,
                channel);
            channel.forceClose();
          } else {
            LOG.debug("[{}] New channel added {}", logPrefix, channel);
            channels.add(channel);
            eventBus.fire(ChannelEvent.channelOpened(node));
            channel
                .closeStartedFuture()
                .addListener(
                    f ->
                        adminExecutor
                            .submit(() -> onChannelCloseStarted(channel))
                            .addListener(UncaughtExceptions::log));
            channel
                .closeFuture()
                .addListener(
                    f ->
                        adminExecutor
                            .submit(() -> onChannelClosed(channel))
                            .addListener(UncaughtExceptions::log));
          }
        }
      }
      // If all channels failed, assume the keyspace is wrong
      invalidKeyspace =
          invalidKeyspaceErrors > 0 && invalidKeyspaceErrors == pendingChannels.size();

      pendingChannels.clear();

      if (fatalError != null) {
        Loggers.warnWithException(
            LOG,
            "[{}] Fatal error while initializing pool, forcing the node down",
            logPrefix,
            fatalError);
        // Note: getBroadcastRpcAddress() can only be empty for the control node (and not for modern
        // C* versions anyway). If we already have a control connection open to that node, it's
        // impossible to get a protocol version or cluster name mismatch error while creating the
        // pool, so it's safe to ignore this case.
        node.getBroadcastRpcAddress()
            .ifPresent(address -> eventBus.fire(TopologyEvent.forceDown(address)));
        // Don't bother continuing, the pool will get shut down soon anyway
        return true;
      }

      shrinkIfTooManyChannels(); // Can happen if the pool was shrinked during the reconnection

      int currentCount = channels.size();
      LOG.debug(
          "[{}] Reconnection attempt complete, {}/{} channels",
          logPrefix,
          currentCount,
          wantedCount);
      // Stop reconnecting if we have the wanted count
      return currentCount >= wantedCount;
    }