in core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java [304:392]
protected void initChannel(Channel channel) {
try {
DriverExecutionProfile defaultConfig = context.getConfig().getDefaultProfile();
long setKeyspaceTimeoutMillis =
defaultConfig
.getDuration(DefaultDriverOption.CONNECTION_SET_KEYSPACE_TIMEOUT)
.toMillis();
int maxFrameLength =
(int) defaultConfig.getBytes(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH);
int maxRequestsPerConnection =
defaultConfig.getInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS);
int maxOrphanRequests =
defaultConfig.getInt(DefaultDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS);
if (maxOrphanRequests >= maxRequestsPerConnection) {
if (LOGGED_ORPHAN_WARNING.compareAndSet(false, true)) {
LOG.warn(
"[{}] Invalid value for {}: {}. It must be lower than {}. "
+ "Defaulting to {} (1/4 of max-requests) instead.",
logPrefix,
DefaultDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS.getPath(),
maxOrphanRequests,
DefaultDriverOption.CONNECTION_MAX_REQUESTS.getPath(),
maxRequestsPerConnection / 4);
}
maxOrphanRequests = maxRequestsPerConnection / 4;
}
InFlightHandler inFlightHandler =
new InFlightHandler(
protocolVersion,
new StreamIdGenerator(maxRequestsPerConnection),
maxOrphanRequests,
setKeyspaceTimeoutMillis,
channel.newPromise(),
options.eventCallback,
options.ownerLogPrefix);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(defaultConfig);
ProtocolInitHandler initHandler =
new ProtocolInitHandler(
context,
protocolVersion,
clusterName,
endPoint,
options,
heartbeatHandler,
productType == null);
ChannelPipeline pipeline = channel.pipeline();
context
.getSslHandlerFactory()
.map(f -> f.newSslHandler(channel, endPoint))
.map(h -> pipeline.addLast(SSL_HANDLER_NAME, h));
// Only add meter handlers on the pipeline if metrics are enabled.
SessionMetricUpdater sessionMetricUpdater = context.getMetricsFactory().getSessionUpdater();
if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_RECEIVED, null)
|| sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_RECEIVED, null)) {
pipeline.addLast(
INBOUND_TRAFFIC_METER_NAME,
new InboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater));
}
if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_SENT, null)
|| sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_SENT, null)) {
pipeline.addLast(
OUTBOUND_TRAFFIC_METER_NAME,
new OutboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater));
}
pipeline
.addLast(
FRAME_TO_BYTES_ENCODER_NAME,
new FrameEncoder(context.getFrameCodec(), maxFrameLength))
.addLast(
BYTES_TO_FRAME_DECODER_NAME,
new FrameDecoder(context.getFrameCodec(), maxFrameLength))
// Note: HeartbeatHandler is inserted here once init completes
.addLast(INFLIGHT_HANDLER_NAME, inFlightHandler)
.addLast(INIT_HANDLER_NAME, initHandler);
context.getNettyOptions().afterChannelInitialized(channel);
} catch (Throwable t) {
// If the init handler throws an exception, Netty swallows it and closes the channel. We
// want to propagate it instead, so fail the outer future (the result of connect()).
resultFuture.completeExceptionally(t);
throw t;
}
}