public NettyConnection createConnection()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java [877:1007]


   public NettyConnection createConnection(Consumer<ChannelFuture> onConnect, String host, int port) {
      InetSocketAddress remoteDestination;
      if (proxyEnabled && proxyRemoteDNS) {
         remoteDestination = InetSocketAddress.createUnresolved(IPV6Util.stripBracketsAndZoneID(host), port);
      } else {
         remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port);
      }

      logger.debug("Remote destination: {}", remoteDestination);

      ChannelFuture future;
      //port 0 does not work so only use local address if set
      if (localPort != 0) {
         SocketAddress localDestination;
         if (localAddress != null) {
            localDestination = new InetSocketAddress(localAddress, localPort);
         } else {
            localDestination = new InetSocketAddress(localPort);
         }
         future = bootstrap.connect(remoteDestination, localDestination);
      } else {
         future = bootstrap.connect(remoteDestination);
      }
      if (onConnect != null) {
         onConnect.accept(future);
      }
      future.awaitUninterruptibly();

      if (future.isSuccess()) {
         final Channel ch = future.channel();
         SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
         if (sslHandler != null) {
            Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
            if (handshakeFuture.awaitUninterruptibly(30000)) {
               if (handshakeFuture.isSuccess()) {
                  ChannelPipeline channelPipeline = ch.pipeline();
                  ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
                  if (!serverConnection) {
                     if (channelHandler != null) {
                        channelHandler.active = true;
                     } else {
                        ch.close().awaitUninterruptibly();
                        ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " + remoteDestination + " from Channel with id = " + ch.id()));
                        return null;
                     }
                  }
               } else {
                  ch.close().awaitUninterruptibly();
                  ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
                  return null;
               }
            } else {
               //handshakeFuture.setFailure(new SSLException("Handshake was not completed in 30 seconds"));
               ch.close().awaitUninterruptibly();
               return null;
            }

         }
         if (httpUpgradeEnabled) {
            // Send a HTTP GET + Upgrade request that will be handled by the http-upgrade handler.
            try {
               //get this first incase it removes itself
               HttpUpgradeHandler httpUpgradeHandler = (HttpUpgradeHandler) ch.pipeline().get("http-upgrade");
               String scheme = "http";
               if (sslEnabled) {
                  scheme = "https";
               }
               String ipv6Host = IPV6Util.encloseHost(host);
               URI uri = new URI(scheme, null, ipv6Host, port, null, null, null);
               HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
               request.headers().set(HttpHeaderNames.HOST, ipv6Host);
               request.headers().set(HttpHeaderNames.UPGRADE, ACTIVEMQ_REMOTING);
               request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.UPGRADE);
               final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
               if (serverName != null) {
                  request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName);
               }

               final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration);
               if (endpoint != null) {
                  request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint);
               }

               // Get 16 bit nonce and base 64 encode it
               byte[] nonce = randomBytes(16);
               String key = base64(nonce);
               request.headers().set(SEC_ACTIVEMQ_REMOTING_KEY, key);
               ch.attr(REMOTING_KEY).set(key);

               logger.debug("Sending HTTP request {}", request);

               // Send the HTTP request.
               ch.writeAndFlush(request);

               if (!httpUpgradeHandler.awaitHandshake()) {
                  ch.close().awaitUninterruptibly();
                  return null;
               }
            } catch (URISyntaxException e) {
               ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(e);
               return null;
            }
         } else {
            ChannelPipeline channelPipeline = ch.pipeline();
            ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
            if (channelHandler != null) {
               channelHandler.active = true;
            } else if (!serverConnection) {
               ch.close().awaitUninterruptibly();
               ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
                  new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
                                               remoteDestination + " from Channel with id = " + ch.id()));
               return null;
            }
         }

         // No acceptor on a client connection
         Listener connectionListener = new Listener();
         NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false);
         connectionListener.connectionCreated(null, conn, protocolManager);
         return conn;
      } else {
         Throwable t = future.cause();

         if (t != null && !(t instanceof ConnectException) && !(t instanceof NoRouteToHostException)) {
            ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(future.cause());
         }

         return null;
      }
   }