private void listenOn()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java [313:438]


    private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
        if (!conf.isDisableServerSocketBind()) {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.option(ChannelOption.ALLOCATOR, allocator);
            bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
            bootstrap.group(acceptorGroup, eventLoopGroup);
            bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
            bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
            bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                    new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                            conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));

            if (eventLoopGroup instanceof IOUringEventLoopGroup){
                bootstrap.channel(IOUringServerSocketChannel.class);
            } else if (eventLoopGroup instanceof EpollEventLoopGroup) {
                bootstrap.channel(EpollServerSocketChannel.class);
            } else {
                bootstrap.channel(NioServerSocketChannel.class);
            }

            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    synchronized (suspensionLock) {
                        while (suspended) {
                            suspensionLock.wait();
                        }
                    }

                    BookieSideConnectionPeerContextHandler contextHandler =
                        new BookieSideConnectionPeerContextHandler();
                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));

                    pipeline.addLast("bytebufList", ByteBufList.ENCODER);

                    pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));

                    pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
                    pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
                    pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
                                contextHandler.getConnectionPeer(), authProviderFactory));

                    ChannelInboundHandler requestHandler = isRunning.get()
                            ? new BookieRequestHandler(conf, requestProcessor, allChannels)
                            : new RejectRequestHandler();
                    pipeline.addLast("bookieRequestHandler", requestHandler);

                    pipeline.addLast("contextHandler", contextHandler);
                }
            });

            // Bind and start to accept incoming connections
            LOG.info("Binding bookie-rpc endpoint to {}", address);
            Channel listen = bootstrap.bind(address.getAddress(), address.getPort()).sync().channel();

            if (listen.localAddress() instanceof InetSocketAddress) {
                if (conf.getBookiePort() == 0) {
                    // this is really really nasty. It's using the configuration object as a notification
                    // bus. We should get rid of this at some point
                    conf.setBookiePort(((InetSocketAddress) listen.localAddress()).getPort());
                }
            }

        }

        if (conf.isEnableLocalTransport()) {
            ServerBootstrap jvmBootstrap = new ServerBootstrap();
            jvmBootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
            jvmBootstrap.group(jvmEventLoopGroup, jvmEventLoopGroup);
            jvmBootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
            jvmBootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive());
            jvmBootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
            jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                    new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                            conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
            jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));

            if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
                jvmBootstrap.channel(LocalServerChannel.class);
            } else if (jvmEventLoopGroup instanceof IOUringEventLoopGroup) {
                jvmBootstrap.channel(IOUringServerSocketChannel.class);
            } else if (jvmEventLoopGroup instanceof EpollEventLoopGroup) {
                jvmBootstrap.channel(EpollServerSocketChannel.class);
            } else {
                jvmBootstrap.channel(NioServerSocketChannel.class);
            }

            jvmBootstrap.childHandler(new ChannelInitializer<LocalChannel>() {
                @Override
                protected void initChannel(LocalChannel ch) throws Exception {
                    synchronized (suspensionLock) {
                        while (suspended) {
                            suspensionLock.wait();
                        }
                    }

                    BookieSideConnectionPeerContextHandler contextHandler =
                        new BookieSideConnectionPeerContextHandler();
                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));

                    pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
                    pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
                    pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
                                contextHandler.getConnectionPeer(), authProviderFactory));

                    ChannelInboundHandler requestHandler = isRunning.get()
                            ? new BookieRequestHandler(conf, requestProcessor, allChannels)
                            : new RejectRequestHandler();
                    pipeline.addLast("bookieRequestHandler", requestHandler);

                    pipeline.addLast("contextHandler", contextHandler);
                }
            });
            LOG.info("Binding jvm bookie-rpc endpoint to {}", bookieId.toString());
            // use the same address 'name', so clients can find local Bookie still discovering them using ZK
            jvmBootstrap.bind(new LocalAddress(bookieId.toString())).sync();
            LocalBookiesRegistry.registerLocalBookieAddress(bookieId);
        }
    }