in core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java [180:268]
private void connect(
EndPoint endPoint,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture) {
NettyOptions nettyOptions = context.getNettyOptions();
Bootstrap bootstrap =
new Bootstrap()
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(
initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture));
nettyOptions.afterBootstrapInitialized(bootstrap);
ChannelFuture connectFuture = bootstrap.connect(endPoint.resolve());
connectFuture.addListener(
cf -> {
if (connectFuture.isSuccess()) {
Channel channel = connectFuture.channel();
DriverChannel driverChannel =
new DriverChannel(endPoint, channel, context.getWriteCoalescer(), currentVersion);
// If this is the first successful connection, remember the protocol version and
// cluster name for future connections.
if (isNegotiating) {
ChannelFactory.this.protocolVersion = currentVersion;
}
if (ChannelFactory.this.clusterName == null) {
ChannelFactory.this.clusterName = driverChannel.getClusterName();
}
Map<String, List<String>> supportedOptions = driverChannel.getOptions();
if (ChannelFactory.this.productType == null && supportedOptions != null) {
List<String> productTypes = supportedOptions.get("PRODUCT_TYPE");
String productType =
productTypes != null && !productTypes.isEmpty()
? productTypes.get(0)
: UNKNOWN_PRODUCT_TYPE;
ChannelFactory.this.productType = productType;
DriverConfig driverConfig = context.getConfig();
if (driverConfig instanceof TypesafeDriverConfig
&& productType.equals(DATASTAX_CLOUD_PRODUCT_TYPE)) {
((TypesafeDriverConfig) driverConfig)
.overrideDefaults(
ImmutableMap.of(
DefaultDriverOption.REQUEST_CONSISTENCY,
ConsistencyLevel.LOCAL_QUORUM.name()));
}
}
resultFuture.complete(driverChannel);
} else {
Throwable error = connectFuture.cause();
if (error instanceof UnsupportedProtocolVersionException && isNegotiating) {
attemptedVersions.add(currentVersion);
Optional<ProtocolVersion> downgraded =
context.getProtocolVersionRegistry().downgrade(currentVersion);
if (downgraded.isPresent()) {
LOG.debug(
"[{}] Failed to connect with protocol {}, retrying with {}",
logPrefix,
currentVersion,
downgraded.get());
connect(
endPoint,
options,
nodeMetricUpdater,
downgraded.get(),
true,
attemptedVersions,
resultFuture);
} else {
resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(
endPoint, attemptedVersions));
}
} else {
// Note: might be completed already if the failure happened in initializer(), this is
// fine
resultFuture.completeExceptionally(error);
}
}
});
}