in core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java [331:412]
private void init(CqlIdentifier keyspace) {
assert adminExecutor.inEventLoop();
if (initWasCalled) {
return;
}
initWasCalled = true;
LOG.debug("[{}] Starting initialization", logPrefix);
// Eagerly fetch user-facing policies right now, no need to start opening connections if
// something is wrong in the configuration.
try {
context.getLoadBalancingPolicies();
context.getRetryPolicies();
context.getSpeculativeExecutionPolicies();
context.getReconnectionPolicy();
context.getAddressTranslator();
context.getNodeStateListener();
context.getSchemaChangeListener();
context.getRequestTracker();
context.getRequestThrottler();
context.getAuthProvider();
context.getSslHandlerFactory();
context.getTimestampGenerator();
} catch (Throwable error) {
RunOrSchedule.on(adminExecutor, this::closePolicies);
context
.getNettyOptions()
.onClose()
.addListener(
f -> {
if (!f.isSuccess()) {
Loggers.warnWithException(
LOG,
"[{}] Error while closing NettyOptions "
+ "(suppressed because we're already handling an init failure)",
logPrefix,
f.cause());
}
initFuture.completeExceptionally(error);
});
LOG.debug(
"Error initializing new session {} ({} live instances)",
context.getSessionName(),
INSTANCE_COUNT.decrementAndGet());
return;
}
closeFuture.whenComplete(
(v, error) ->
LOG.debug(
"Closing session {} ({} live instances)",
context.getSessionName(),
INSTANCE_COUNT.decrementAndGet()));
MetadataManager metadataManager = context.getMetadataManager();
metadataManager.addContactPoints(initialContactPoints);
context
.getTopologyMonitor()
.init()
.thenCompose(v -> metadataManager.refreshNodes())
.thenCompose(v -> checkProtocolVersion())
.thenCompose(v -> initialSchemaRefresh())
.thenCompose(v -> initializePools(keyspace))
.whenComplete(
(v, error) -> {
if (error == null) {
LOG.debug("[{}] Initialization complete, ready", logPrefix);
notifyListeners();
initFuture.complete(DefaultSession.this);
} else {
LOG.debug("[{}] Initialization failed, force closing", logPrefix, error);
forceCloseAsync()
.whenComplete(
(v1, error1) -> {
if (error1 != null) {
error.addSuppressed(error1);
}
initFuture.completeExceptionally(error);
});
}
});
}