in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java [877:1007]
public NettyConnection createConnection(Consumer<ChannelFuture> onConnect, String host, int port) {
InetSocketAddress remoteDestination;
if (proxyEnabled && proxyRemoteDNS) {
remoteDestination = InetSocketAddress.createUnresolved(IPV6Util.stripBracketsAndZoneID(host), port);
} else {
remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port);
}
logger.debug("Remote destination: {}", remoteDestination);
ChannelFuture future;
//port 0 does not work so only use local address if set
if (localPort != 0) {
SocketAddress localDestination;
if (localAddress != null) {
localDestination = new InetSocketAddress(localAddress, localPort);
} else {
localDestination = new InetSocketAddress(localPort);
}
future = bootstrap.connect(remoteDestination, localDestination);
} else {
future = bootstrap.connect(remoteDestination);
}
if (onConnect != null) {
onConnect.accept(future);
}
future.awaitUninterruptibly();
if (future.isSuccess()) {
final Channel ch = future.channel();
SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
if (sslHandler != null) {
Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
if (handshakeFuture.awaitUninterruptibly(30000)) {
if (handshakeFuture.isSuccess()) {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
if (!serverConnection) {
if (channelHandler != null) {
channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
}
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
return null;
}
} else {
//handshakeFuture.setFailure(new SSLException("Handshake was not completed in 30 seconds"));
ch.close().awaitUninterruptibly();
return null;
}
}
if (httpUpgradeEnabled) {
// Send a HTTP GET + Upgrade request that will be handled by the http-upgrade handler.
try {
//get this first incase it removes itself
HttpUpgradeHandler httpUpgradeHandler = (HttpUpgradeHandler) ch.pipeline().get("http-upgrade");
String scheme = "http";
if (sslEnabled) {
scheme = "https";
}
String ipv6Host = IPV6Util.encloseHost(host);
URI uri = new URI(scheme, null, ipv6Host, port, null, null, null);
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
request.headers().set(HttpHeaderNames.HOST, ipv6Host);
request.headers().set(HttpHeaderNames.UPGRADE, ACTIVEMQ_REMOTING);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.UPGRADE);
final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
if (serverName != null) {
request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName);
}
final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration);
if (endpoint != null) {
request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint);
}
// Get 16 bit nonce and base 64 encode it
byte[] nonce = randomBytes(16);
String key = base64(nonce);
request.headers().set(SEC_ACTIVEMQ_REMOTING_KEY, key);
ch.attr(REMOTING_KEY).set(key);
logger.debug("Sending HTTP request {}", request);
// Send the HTTP request.
ch.writeAndFlush(request);
if (!httpUpgradeHandler.awaitHandshake()) {
ch.close().awaitUninterruptibly();
return null;
}
} catch (URISyntaxException e) {
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(e);
return null;
}
} else {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
if (channelHandler != null) {
channelHandler.active = true;
} else if (!serverConnection) {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
}
// No acceptor on a client connection
Listener connectionListener = new Listener();
NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false);
connectionListener.connectionCreated(null, conn, protocolManager);
return conn;
} else {
Throwable t = future.cause();
if (t != null && !(t instanceof ConnectException) && !(t instanceof NoRouteToHostException)) {
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(future.cause());
}
return null;
}
}