public synchronized void start()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java [413:552]


   public synchronized void start() throws Exception {
      if (channelClazz != null) {
         // Already started
         return;
      }

      String acceptorType;

      if (useInvm) {
         acceptorType = INVM_ACCEPTOR_TYPE;
         channelClazz = LocalServerChannel.class;
         eventLoopGroup = new DefaultEventLoopGroup();
      } else {

         if (remotingThreads == -1) {
            // Default to number of cores * 3
            remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
         }

         if (useEpoll && CheckDependencies.isEpollAvailable()) {
            channelClazz = EpollServerSocketChannel.class;
            eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
               @Override
               public ActiveMQThreadFactory run() {
                  return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
               }
            }));
            acceptorType = EPOLL_ACCEPTOR_TYPE;

            logger.debug("Acceptor using native epoll");
         } else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
            channelClazz = KQueueServerSocketChannel.class;
            eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
               @Override
               public ActiveMQThreadFactory run() {
                  return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
               }
            }));
            acceptorType = KQUEUE_ACCEPTOR_TYPE;

            logger.debug("Acceptor using native kqueue");
         } else {
            channelClazz = NioServerSocketChannel.class;
            eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
               @Override
               public ActiveMQThreadFactory run() {
                  return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
               }
            }));
            acceptorType = NIO_ACCEPTOR_TYPE;
            logger.debug("Acceptor using nio");
         }
      }

      bootstrap = new ServerBootstrap();
      bootstrap.group(eventLoopGroup);
      bootstrap.channel(channelClazz);

      ChannelInitializer<Channel> factory = new ChannelInitializer<Channel>() {
         @Override
         public void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            if (sslEnabled) {
               final Pair<String, Integer> peerInfo = getPeerInfo(channel);
               try {
                  pipeline.addLast("sni", new NettySNIHostnameHandler());
                  pipeline.addLast("ssl", getSslHandler(channel.alloc(), peerInfo.getA(), peerInfo.getB()));
                  pipeline.addLast("sslHandshakeExceptionHandler", new SslHandshakeExceptionHandler());
               } catch (Exception e) {
                  Throwable rootCause = ExceptionUtils.getRootCause(e);
                  ActiveMQServerLogger.LOGGER.gettingSslHandlerFailed(channel.remoteAddress().toString(), rootCause.getClass().getName() + ": " + rootCause.getMessage());

                  logger.debug("Getting SSL handler failed", e);
                  throw e;
               }
            }
            pipeline.addLast(protocolHandler.getProtocolDecoder());
         }

         private Pair<String, Integer> getPeerInfo(Channel channel) {
            try {
               String[] peerInfo = channel.remoteAddress().toString().replace("/", "").split(":");
               return new Pair<>(peerInfo[0], Integer.parseInt(peerInfo[1]));
            } catch (Exception e) {
               logger.debug("Failed to parse peer info for SSL engine initialization", e);
            }

            return new Pair<>(null, 0);
         }
      };
      bootstrap.childHandler(factory);

      // Bind
      bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
      if (tcpReceiveBufferSize != -1) {
         bootstrap.childOption(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize);
      }
      if (tcpSendBufferSize != -1) {
         bootstrap.childOption(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
      }
      final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low();
      final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high();
      final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark);
      bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
      if (backlog != -1) {
         bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
      }
      bootstrap.option(ChannelOption.SO_REUSEADDR, true);
      bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
      bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
      channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE);

      serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE);

      if (httpUpgradeEnabled) {
         // the channel will be bound by the Web container and hand over after the HTTP Upgrade
         // handshake is successful
      } else {
         startServerChannels();

         paused = false;

         if (notificationService != null) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
            props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host));
            props.putIntProperty(new SimpleString("port"), actualPort);
            Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props);
            notificationService.sendNotification(notification);
         }

         ActiveMQServerLogger.LOGGER.startedAcceptor(acceptorType, host, actualPort, protocolsString);
      }

      if (batchDelay > 0) {
         flusher = new BatchFlusher();

         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
      }
   }