in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java [526:780]
public synchronized void start() {
if (channelClazz != null) {
return;
}
if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}
String connectorType;
if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new EpollEventLoopGroup(remotingThreads);
}
connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;
logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new KQueueEventLoopGroup(remotingThreads);
}
connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;
logger.debug("Connector {} using native kqueue", this);
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(remotingThreads);
}
connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;
logger.debug("Connector {} using nio", this);
}
// if we are a servlet wrap the socketChannelFactory
bootstrap = new Bootstrap();
bootstrap.channel(channelClazz);
bootstrap.group(group);
bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
if (connectTimeoutMillis != -1) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
}
if (tcpReceiveBufferSize != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize);
}
if (tcpSendBufferSize != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
}
final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low();
final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high();
final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
final String realKeyStorePath;
final String realKeyStoreProvider;
final String realKeyStoreType;
final String realKeyStorePassword;
final String realKeyStoreAlias;
final String realTrustStorePath;
final String realTrustStoreProvider;
final String realTrustStoreType;
final String realTrustStorePassword;
if (sslEnabled) {
if (forceSSLParameters) {
realKeyStorePath = keyStorePath;
realKeyStoreProvider = keyStoreProvider;
realKeyStoreType = keyStoreType;
realKeyStorePassword = keyStorePassword;
realKeyStoreAlias = keyStoreAlias;
realTrustStorePath = trustStorePath;
realTrustStoreProvider = trustStoreProvider;
realTrustStoreType = trustStoreType;
realTrustStorePassword = trustStorePassword;
} else {
String tempPasswordCodecClass = Stream.of(System.getProperty(ACTIVEMQ_SSL_PASSWORD_CODEC_CLASS_PROP_NAME), passwordCodecClass).map(v -> useDefaultSslContext ? passwordCodecClass : v).filter(Objects::nonNull).findFirst().orElse(null);
realKeyStorePath = Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PATH_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME), keyStorePath).map(v -> useDefaultSslContext ? keyStorePath : v).filter(Objects::nonNull).findFirst().orElse(null);
String tempKeyStorePassword = Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PASSWORD_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME), keyStorePassword).map(v -> useDefaultSslContext ? keyStorePassword : v).filter(Objects::nonNull).findFirst().orElse(null);
if (tempKeyStorePassword != null && !tempKeyStorePassword.equals(keyStorePassword)) {
tempKeyStorePassword = processSslPasswordProperty(tempKeyStorePassword, tempPasswordCodecClass);
}
realKeyStorePassword = tempKeyStorePassword;
realKeyStoreAlias = keyStoreAlias;
Pair<String, String> keyStoreCompat = SSLSupport.getValidProviderAndType(Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_PROVIDER_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_PROVIDER_PROP_NAME), keyStoreProvider).map(v -> useDefaultSslContext ? keyStoreProvider : v).filter(Objects::nonNull).findFirst().orElse(null),
Stream.of(System.getProperty(ACTIVEMQ_KEYSTORE_TYPE_PROP_NAME), System.getProperty(JAVAX_KEYSTORE_TYPE_PROP_NAME), keyStoreType).map(v -> useDefaultSslContext ? keyStoreType : v).filter(Objects::nonNull).findFirst().orElse(null));
realKeyStoreProvider = keyStoreCompat.getA();
realKeyStoreType = keyStoreCompat.getB();
realTrustStorePath = Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PATH_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME), trustStorePath).map(v -> useDefaultSslContext ? trustStorePath : v).filter(Objects::nonNull).findFirst().orElse(null);
String tempTrustStorePassword = Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PASSWORD_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME), trustStorePassword).map(v -> useDefaultSslContext ? trustStorePassword : v).filter(Objects::nonNull).findFirst().orElse(null);
if (tempTrustStorePassword != null && !tempTrustStorePassword.equals(trustStorePassword)) {
tempTrustStorePassword = processSslPasswordProperty(tempTrustStorePassword, tempPasswordCodecClass);
}
realTrustStorePassword = tempTrustStorePassword;
Pair<String, String> trustStoreCompat = SSLSupport.getValidProviderAndType(Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_PROVIDER_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_PROVIDER_PROP_NAME), trustStoreProvider).map(v -> useDefaultSslContext ? trustStoreProvider : v).filter(Objects::nonNull).findFirst().orElse(null),
Stream.of(System.getProperty(ACTIVEMQ_TRUSTSTORE_TYPE_PROP_NAME), System.getProperty(JAVAX_TRUSTSTORE_TYPE_PROP_NAME), trustStoreType).map(v -> useDefaultSslContext ? trustStoreType : v).filter(Objects::nonNull).findFirst().orElse(null));
realTrustStoreProvider = trustStoreCompat.getA();
realTrustStoreType = trustStoreCompat.getB();
}
} else {
realKeyStorePath = null;
realKeyStoreProvider = null;
realKeyStoreType = null;
realKeyStorePassword = null;
realKeyStoreAlias = null;
realTrustStorePath = null;
realTrustStoreProvider = null;
realTrustStoreType = null;
realTrustStorePassword = null;
}
bootstrap.handler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) throws Exception {
final ChannelPipeline pipeline = channel.pipeline();
if (proxyEnabled && (proxyRemoteDNS || !isTargetLocalHost())) {
InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
ProxyHandler proxyHandler = switch (proxyVersion) {
case SOCKS5 -> new Socks5ProxyHandler(proxyAddress, proxyUsername, proxyPassword);
case SOCKS4a -> new Socks4ProxyHandler(proxyAddress, proxyUsername);
default -> throw new IllegalArgumentException("Unknown SOCKS proxy version");
};
channel.pipeline().addLast(proxyHandler);
logger.debug("Using a SOCKS proxy at {}:{}", proxyHost, proxyPort);
if (proxyRemoteDNS) {
bootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
}
}
if (sslEnabled && !useServlet) {
final SSLContextConfig sslContextConfig = SSLContextConfig.builder()
.keystoreProvider(realKeyStoreProvider)
.keystorePath(realKeyStorePath)
.keystoreType(realKeyStoreType)
.keystorePassword(realKeyStorePassword)
.keystoreAlias(realKeyStoreAlias)
.truststoreProvider(realTrustStoreProvider)
.truststorePath(realTrustStorePath)
.truststoreType(realTrustStoreType)
.truststorePassword(realTrustStorePassword)
.trustManagerFactoryPlugin(trustManagerFactoryPlugin)
.crlPath(crlPath)
.trustAll(trustAll)
.build();
final SSLEngine engine;
if (sslProvider.equals(TransportConstants.OPENSSL_PROVIDER)) {
engine = loadOpenSslEngine(channel.alloc(), sslContextConfig);
} else {
engine = loadJdkSslEngine(sslContextConfig);
}
engine.setUseClientMode(true);
engine.setWantClientAuth(true);
// setting the enabled cipher suites resets the enabled protocols so we need
// to save the enabled protocols so that after the customer cipher suite is enabled
// we can reset the enabled protocols if a customer protocol isn't specified
String[] originalProtocols = engine.getEnabledProtocols();
if (enabledCipherSuites != null) {
try {
engine.setEnabledCipherSuites(SSLSupport.parseCommaSeparatedListIntoArray(enabledCipherSuites));
} catch (IllegalArgumentException e) {
ActiveMQClientLogger.LOGGER.invalidCipherSuite(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedCipherSuites()));
throw e;
}
}
if (enabledProtocols != null) {
try {
engine.setEnabledProtocols(SSLSupport.parseCommaSeparatedListIntoArray(enabledProtocols));
} catch (IllegalArgumentException e) {
ActiveMQClientLogger.LOGGER.invalidProtocol(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedProtocols()));
throw e;
}
} else {
engine.setEnabledProtocols(originalProtocols);
}
if (verifyHost) {
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
engine.setSSLParameters(sslParameters);
}
if (sniHost != null) {
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setServerNames(Arrays.asList(new SNIHostName(sniHost)));
engine.setSSLParameters(sslParameters);
}
SslHandler handler = new SslHandler(engine);
pipeline.addLast("ssl", handler);
}
if (httpEnabled) {
pipeline.addLast(new HttpRequestEncoder());
pipeline.addLast(new HttpResponseDecoder());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
pipeline.addLast(new HttpHandler(httpHeaders));
}
if (httpUpgradeEnabled) {
// prepare to handle a HTTP 101 response to upgrade the protocol.
final HttpClientCodec httpClientCodec = new HttpClientCodec();
pipeline.addLast(httpClientCodec);
pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec));
}
if (protocolManager != null) {
protocolManager.addChannelHandlers(pipeline);
}
if (handler != null) {
pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
logger.debug("Added ActiveMQClientChannelHandler to Channel with id = {} ", channel.id());
}
}
});
if (batchDelay > 0) {
flusher = new BatchFlusher();
batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
}
logger.debug("Started {} Netty Connector version {} to {}:{}", connectorType, TransportConstants.NETTY_VERSION, host, port);
}