in core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java [352:458]
private void connect(
Queue<Node> nodes,
List<Entry<Node, Throwable>> errors,
Runnable onSuccess,
Consumer<Throwable> onFailure) {
assert adminExecutor.inEventLoop();
Node node = nodes.poll();
if (node == null) {
onFailure.accept(AllNodesFailedException.fromErrors(errors));
} else {
LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node);
context
.getChannelFactory()
.connect(node, channelOptions)
.whenCompleteAsync(
(channel, error) -> {
try {
NodeDistance lastDistance = lastNodeDistance.get(node);
NodeState lastState = lastNodeState.get(node);
if (error != null) {
if (closeWasCalled || initFuture.isCancelled()) {
onSuccess.run(); // abort, we don't really care about the result
} else {
if (error instanceof AuthenticationException) {
Loggers.warnWithException(
LOG, "[{}] Authentication error", logPrefix, error);
} else {
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) {
Loggers.warnWithException(
LOG,
"[{}] Error connecting to {}, trying next node",
logPrefix,
node,
error);
} else {
LOG.debug(
"[{}] Error connecting to {}, trying next node",
logPrefix,
node,
error);
}
}
List<Entry<Node, Throwable>> newErrors =
(errors == null) ? new ArrayList<>() : errors;
newErrors.add(new SimpleEntry<>(node, error));
context.getEventBus().fire(ChannelEvent.controlConnectionFailed(node));
connect(nodes, newErrors, onSuccess, onFailure);
}
} else if (closeWasCalled || initFuture.isCancelled()) {
LOG.debug(
"[{}] New channel opened ({}) but the control connection was closed, closing it",
logPrefix,
channel);
channel.forceClose();
onSuccess.run();
} else if (lastDistance == NodeDistance.IGNORED) {
LOG.debug(
"[{}] New channel opened ({}) but node became ignored, "
+ "closing and trying next node",
logPrefix,
channel);
channel.forceClose();
connect(nodes, errors, onSuccess, onFailure);
} else if (lastNodeState.containsKey(node)
&& (lastState == null /*(removed)*/
|| lastState == NodeState.FORCED_DOWN)) {
LOG.debug(
"[{}] New channel opened ({}) but node was removed or forced down, "
+ "closing and trying next node",
logPrefix,
channel);
channel.forceClose();
connect(nodes, errors, onSuccess, onFailure);
} else {
LOG.debug("[{}] New channel opened {}", logPrefix, channel);
DriverChannel previousChannel = ControlConnection.this.channel;
ControlConnection.this.channel = channel;
if (previousChannel != null) {
// We were reconnecting: make sure previous channel gets closed (it may
// still be open if reconnection was forced)
LOG.debug(
"[{}] Forcefully closing previous channel {}", logPrefix, channel);
previousChannel.forceClose();
}
context.getEventBus().fire(ChannelEvent.channelOpened(node));
channel
.closeFuture()
.addListener(
f ->
adminExecutor
.submit(() -> onChannelClosed(channel, node))
.addListener(UncaughtExceptions::log));
onSuccess.run();
}
} catch (Exception e) {
Loggers.warnWithException(
LOG,
"[{}] Unexpected exception while processing channel init result",
logPrefix,
e);
}
},
adminExecutor);
}
}