in core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java [307:404]
private boolean onAllConnected(@SuppressWarnings("unused") Void v) {
assert adminExecutor.inEventLoop();
Throwable fatalError = null;
int invalidKeyspaceErrors = 0;
for (CompletionStage<DriverChannel> pendingChannel : pendingChannels) {
CompletableFuture<DriverChannel> future = pendingChannel.toCompletableFuture();
assert future.isDone();
if (future.isCompletedExceptionally()) {
Throwable error = CompletableFutures.getFailed(future);
((DefaultNode) node)
.getMetricUpdater()
.incrementCounter(
error instanceof AuthenticationException
? DefaultNodeMetric.AUTHENTICATION_ERRORS
: DefaultNodeMetric.CONNECTION_INIT_ERRORS,
null);
if (error instanceof ClusterNameMismatchException
|| error instanceof UnsupportedProtocolVersionException) {
// This will likely be thrown by all channels, but finish the loop cleanly
fatalError = error;
} else if (error instanceof AuthenticationException) {
// Always warn because this is most likely something the operator needs to fix.
// Keep going to reconnect if it can be fixed without bouncing the client.
Loggers.warnWithException(LOG, "[{}] Authentication error", logPrefix, error);
} else if (error instanceof InvalidKeyspaceException) {
invalidKeyspaceErrors += 1;
} else {
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) {
Loggers.warnWithException(
LOG, "[{}] Error while opening new channel", logPrefix, error);
} else {
LOG.debug("[{}] Error while opening new channel", logPrefix, error);
}
}
} else {
DriverChannel channel = CompletableFutures.getCompleted(future);
if (isClosing) {
LOG.debug(
"[{}] New channel added ({}) but the pool was closed, closing it",
logPrefix,
channel);
channel.forceClose();
} else {
LOG.debug("[{}] New channel added {}", logPrefix, channel);
channels.add(channel);
eventBus.fire(ChannelEvent.channelOpened(node));
channel
.closeStartedFuture()
.addListener(
f ->
adminExecutor
.submit(() -> onChannelCloseStarted(channel))
.addListener(UncaughtExceptions::log));
channel
.closeFuture()
.addListener(
f ->
adminExecutor
.submit(() -> onChannelClosed(channel))
.addListener(UncaughtExceptions::log));
}
}
}
// If all channels failed, assume the keyspace is wrong
invalidKeyspace =
invalidKeyspaceErrors > 0 && invalidKeyspaceErrors == pendingChannels.size();
pendingChannels.clear();
if (fatalError != null) {
Loggers.warnWithException(
LOG,
"[{}] Fatal error while initializing pool, forcing the node down",
logPrefix,
fatalError);
// Note: getBroadcastRpcAddress() can only be empty for the control node (and not for modern
// C* versions anyway). If we already have a control connection open to that node, it's
// impossible to get a protocol version or cluster name mismatch error while creating the
// pool, so it's safe to ignore this case.
node.getBroadcastRpcAddress()
.ifPresent(address -> eventBus.fire(TopologyEvent.forceDown(address)));
// Don't bother continuing, the pool will get shut down soon anyway
return true;
}
shrinkIfTooManyChannels(); // Can happen if the pool was shrinked during the reconnection
int currentCount = channels.size();
LOG.debug(
"[{}] Reconnection attempt complete, {}/{} channels",
logPrefix,
currentCount,
wantedCount);
// Stop reconnecting if we have the wanted count
return currentCount >= wantedCount;
}