public synchronized void start()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java [513:773]


   public synchronized void start() {
      if (channelClazz != null) {
         return;
      }

      if (remotingThreads == -1) {
         // Default to number of cores * 3
         remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
      }

      String connectorType;

      if (useEpoll && CheckDependencies.isEpollAvailable()) {
         if (useGlobalWorkerPool) {
            group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
         } else {
            group = new EpollEventLoopGroup(remotingThreads);
         }
         connectorType = EPOLL_CONNECTOR_TYPE;
         channelClazz = EpollSocketChannel.class;
         logger.debug("Connector {} using native epoll", this);
      } else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
         if (useGlobalWorkerPool) {
            group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
         } else {
            group = new KQueueEventLoopGroup(remotingThreads);
         }
         connectorType = KQUEUE_CONNECTOR_TYPE;
         channelClazz = KQueueSocketChannel.class;
         logger.debug("Connector {} using native kqueue", this);
      } else {
         if (useGlobalWorkerPool) {
            channelClazz = NioSocketChannel.class;
            group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
         } else {
            channelClazz = NioSocketChannel.class;
            group = new NioEventLoopGroup(remotingThreads);
         }
         connectorType = NIO_CONNECTOR_TYPE;
         channelClazz = NioSocketChannel.class;
         logger.debug("Connector {} using nio", this);
      }
      // if we are a servlet wrap the socketChannelFactory

      bootstrap = new Bootstrap();
      bootstrap.channel(channelClazz);
      bootstrap.group(group);

      bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);

      if (connectTimeoutMillis != -1) {
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
      }
      if (tcpReceiveBufferSize != -1) {
         bootstrap.option(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize);
      }
      if (tcpSendBufferSize != -1) {
         bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
      }
      final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low();
      final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high();
      final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark);
      bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
      bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
      bootstrap.option(ChannelOption.SO_REUSEADDR, true);
      channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);

      final String realKeyStorePath;
      final String realKeyStoreProvider;
      final String realKeyStoreType;
      final String realKeyStorePassword;
      final String realKeyStoreAlias;
      final String realTrustStorePath;
      final String realTrustStoreProvider;
      final String realTrustStoreType;
      final String realTrustStorePassword;

      if (sslEnabled) {
         if (forceSSLParameters) {
            realKeyStorePath = keyStorePath;
            realKeyStoreProvider = keyStoreProvider;
            realKeyStoreType = keyStoreType;
            realKeyStorePassword = keyStorePassword;
            realKeyStoreAlias = keyStoreAlias;
            realTrustStorePath = trustStorePath;
            realTrustStoreProvider = trustStoreProvider;
            realTrustStoreType = trustStoreType;
            realTrustStorePassword = trustStorePassword;
         } else {
            String tempPasswordCodecClass = Stream.of(System.getProperty(ACTIVEMQ_SSL_PASSWORD_CODEC_CLASS_PROP_NAME), passwordCodecClass).map(v -> useDefaultSslContext ? passwordCodecClass : v).filter(Objects::nonNull).findFirst().orElse(null);

            realKeyStorePath = Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PATH_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME), keyStorePath).map(v -> useDefaultSslContext ? keyStorePath : v).filter(Objects::nonNull).findFirst().orElse(null);
            String tempKeyStorePassword = Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PASSWORD_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME), keyStorePassword).map(v -> useDefaultSslContext ? keyStorePassword : v).filter(Objects::nonNull).findFirst().orElse(null);
            if (tempKeyStorePassword != null && !tempKeyStorePassword.equals(keyStorePassword)) {
               tempKeyStorePassword = processSslPasswordProperty(tempKeyStorePassword, tempPasswordCodecClass);
            }
            realKeyStorePassword = tempKeyStorePassword;
            realKeyStoreAlias = keyStoreAlias;

            Pair<String, String> keyStoreCompat = SSLSupport.getValidProviderAndType(Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PROVIDER_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PROVIDER_PROP_NAME), keyStoreProvider).map(v -> useDefaultSslContext ? keyStoreProvider : v).filter(Objects::nonNull).findFirst().orElse(null),
                                                                                     Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_TYPE_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_TYPE_PROP_NAME), keyStoreType).map(v -> useDefaultSslContext ? keyStoreType : v).filter(Objects::nonNull).findFirst().orElse(null));
            realKeyStoreProvider = keyStoreCompat.getA();
            realKeyStoreType = keyStoreCompat.getB();

            realTrustStorePath = Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PATH_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME), trustStorePath).map(v -> useDefaultSslContext ? trustStorePath : v).filter(Objects::nonNull).findFirst().orElse(null);
            String tempTrustStorePassword = Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PASSWORD_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME), trustStorePassword).map(v -> useDefaultSslContext ? trustStorePassword : v).filter(Objects::nonNull).findFirst().orElse(null);
            if (tempTrustStorePassword != null && !tempTrustStorePassword.equals(trustStorePassword)) {
               tempTrustStorePassword = processSslPasswordProperty(tempTrustStorePassword, tempPasswordCodecClass);
            }
            realTrustStorePassword = tempTrustStorePassword;

            Pair<String, String> trustStoreCompat = SSLSupport.getValidProviderAndType(Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PROVIDER_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PROVIDER_PROP_NAME), trustStoreProvider).map(v -> useDefaultSslContext ? trustStoreProvider : v).filter(Objects::nonNull).findFirst().orElse(null),
                                                                                       Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_TYPE_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_TYPE_PROP_NAME), trustStoreType).map(v -> useDefaultSslContext ? trustStoreType : v).filter(Objects::nonNull).findFirst().orElse(null));
            realTrustStoreProvider = trustStoreCompat.getA();
            realTrustStoreType = trustStoreCompat.getB();
         }
      } else {
         realKeyStorePath = null;
         realKeyStoreProvider = null;
         realKeyStoreType = null;
         realKeyStorePassword = null;
         realKeyStoreAlias = null;
         realTrustStorePath = null;
         realTrustStoreProvider = null;
         realTrustStoreType = null;
         realTrustStorePassword = null;
      }

      bootstrap.handler(new ChannelInitializer<Channel>() {
         @Override
         public void initChannel(Channel channel) throws Exception {
            final ChannelPipeline pipeline = channel.pipeline();

            if (proxyEnabled && (proxyRemoteDNS || !isTargetLocalHost())) {
               InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
               ProxyHandler proxyHandler;
               switch (proxyVersion) {
                  case SOCKS5:
                     proxyHandler = new Socks5ProxyHandler(proxyAddress, proxyUsername, proxyPassword);
                     break;
                  case SOCKS4a:
                     proxyHandler = new Socks4ProxyHandler(proxyAddress, proxyUsername);
                     break;
                  default:
                     throw new IllegalArgumentException("Unknown SOCKS proxy version");
               }

               channel.pipeline().addLast(proxyHandler);

               logger.debug("Using a SOCKS proxy at {}:{}", proxyHost, proxyPort);

               if (proxyRemoteDNS) {
                  bootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
               }
            }

            if (sslEnabled && !useServlet) {

               final SSLContextConfig sslContextConfig = SSLContextConfig.builder()
                  .keystoreProvider(realKeyStoreProvider)
                  .keystorePath(realKeyStorePath)
                  .keystoreType(realKeyStoreType)
                  .keystorePassword(realKeyStorePassword)
                  .keystoreAlias(realKeyStoreAlias)
                  .truststoreProvider(realTrustStoreProvider)
                  .truststorePath(realTrustStorePath)
                  .truststoreType(realTrustStoreType)
                  .truststorePassword(realTrustStorePassword)
                  .trustManagerFactoryPlugin(trustManagerFactoryPlugin)
                  .crlPath(crlPath)
                  .trustAll(trustAll)
                  .build();

               final SSLEngine engine;
               if (sslProvider.equals(TransportConstants.OPENSSL_PROVIDER)) {
                  engine = loadOpenSslEngine(channel.alloc(), sslContextConfig);
               } else {
                  engine = loadJdkSslEngine(sslContextConfig);
               }

               engine.setUseClientMode(true);

               engine.setWantClientAuth(true);

               // setting the enabled cipher suites resets the enabled protocols so we need
               // to save the enabled protocols so that after the customer cipher suite is enabled
               // we can reset the enabled protocols if a customer protocol isn't specified
               String[] originalProtocols = engine.getEnabledProtocols();

               if (enabledCipherSuites != null) {
                  try {
                     engine.setEnabledCipherSuites(SSLSupport.parseCommaSeparatedListIntoArray(enabledCipherSuites));
                  } catch (IllegalArgumentException e) {
                     ActiveMQClientLogger.LOGGER.invalidCipherSuite(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedCipherSuites()));
                     throw e;
                  }
               }

               if (enabledProtocols != null) {
                  try {
                     engine.setEnabledProtocols(SSLSupport.parseCommaSeparatedListIntoArray(enabledProtocols));
                  } catch (IllegalArgumentException e) {
                     ActiveMQClientLogger.LOGGER.invalidProtocol(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedProtocols()));
                     throw e;
                  }
               } else {
                  engine.setEnabledProtocols(originalProtocols);
               }

               if (verifyHost) {
                  SSLParameters sslParameters = engine.getSSLParameters();
                  sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                  engine.setSSLParameters(sslParameters);
               }

               if (sniHost != null) {
                  SSLParameters sslParameters = engine.getSSLParameters();
                  sslParameters.setServerNames(Arrays.asList(new SNIHostName(sniHost)));
                  engine.setSSLParameters(sslParameters);
               }

               SslHandler handler = new SslHandler(engine);

               pipeline.addLast("ssl", handler);
            }

            if (httpEnabled) {
               pipeline.addLast(new HttpRequestEncoder());

               pipeline.addLast(new HttpResponseDecoder());

               pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));

               pipeline.addLast(new HttpHandler());
            }

            if (httpUpgradeEnabled) {
               // prepare to handle a HTTP 101 response to upgrade the protocol.
               final HttpClientCodec httpClientCodec = new HttpClientCodec();
               pipeline.addLast(httpClientCodec);
               pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec));
            }

            if (protocolManager != null) {
               protocolManager.addChannelHandlers(pipeline);
            }

            if (handler != null) {
               pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
               logger.debug("Added ActiveMQClientChannelHandler to Channel with id = {} ", channel.id());
            }
         }
      });

      if (batchDelay > 0) {
         flusher = new BatchFlusher();

         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
      }
      logger.debug("Started {} Netty Connector version {} to {}:{}", connectorType, TransportConstants.NETTY_VERSION, host, port);
   }