in qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java [127:207]
public ScheduledExecutorService connect(final Runnable initRoutine, SSLContext sslContextOverride) throws IOException {
if (closed.get()) {
throw new IllegalStateException("Transport has already been closed");
}
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
TransportOptions transportOptions = getTransportOptions();
EventLoopType eventLoopType = EventLoopType.valueOf(transportOptions);
int sharedEventLoopThreads = transportOptions.getSharedEventLoopThreads();
if (sharedEventLoopThreads > 0) {
groupRef = sharedGroup(eventLoopType, sharedEventLoopThreads);
} else {
groupRef = unsharedGroup(eventLoopType, ioThreadfactory);
}
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(groupRef.group());
eventLoopType.createChannel(bootstrap);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
if (initRoutine != null) {
try {
initRoutine.run();
} catch (Throwable initError) {
LOG.warn("Error during initialization of channel from provided initialization routine");
connectionFailed(connectedChannel, IOExceptionSupport.create(initError));
throw initError;
}
}
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, transportOptions);
transportOptions.setSslContextOverride(sslContextOverride);
ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
handleException(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
try {
connectLatch.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(() -> {
if (failureCause != null) {
channel.pipeline().fireExceptionCaught(failureCause);
}
});
}
// returning the channel's specific event loop: the overall event loop group may be multi-threaded
return channel.eventLoop();
}