in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java [313:438]
private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
if (!conf.isDisableServerSocketBind()) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
bootstrap.group(acceptorGroup, eventLoopGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
if (eventLoopGroup instanceof IOUringEventLoopGroup){
bootstrap.channel(IOUringServerSocketChannel.class);
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.channel(EpollServerSocketChannel.class);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
synchronized (suspensionLock) {
while (suspended) {
suspensionLock.wait();
}
}
BookieSideConnectionPeerContextHandler contextHandler =
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
contextHandler.getConnectionPeer(), authProviderFactory));
ChannelInboundHandler requestHandler = isRunning.get()
? new BookieRequestHandler(conf, requestProcessor, allChannels)
: new RejectRequestHandler();
pipeline.addLast("bookieRequestHandler", requestHandler);
pipeline.addLast("contextHandler", contextHandler);
}
});
// Bind and start to accept incoming connections
LOG.info("Binding bookie-rpc endpoint to {}", address);
Channel listen = bootstrap.bind(address.getAddress(), address.getPort()).sync().channel();
if (listen.localAddress() instanceof InetSocketAddress) {
if (conf.getBookiePort() == 0) {
// this is really really nasty. It's using the configuration object as a notification
// bus. We should get rid of this at some point
conf.setBookiePort(((InetSocketAddress) listen.localAddress()).getPort());
}
}
}
if (conf.isEnableLocalTransport()) {
ServerBootstrap jvmBootstrap = new ServerBootstrap();
jvmBootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
jvmBootstrap.group(jvmEventLoopGroup, jvmEventLoopGroup);
jvmBootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
jvmBootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive());
jvmBootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
jvmBootstrap.channel(LocalServerChannel.class);
} else if (jvmEventLoopGroup instanceof IOUringEventLoopGroup) {
jvmBootstrap.channel(IOUringServerSocketChannel.class);
} else if (jvmEventLoopGroup instanceof EpollEventLoopGroup) {
jvmBootstrap.channel(EpollServerSocketChannel.class);
} else {
jvmBootstrap.channel(NioServerSocketChannel.class);
}
jvmBootstrap.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) throws Exception {
synchronized (suspensionLock) {
while (suspended) {
suspensionLock.wait();
}
}
BookieSideConnectionPeerContextHandler contextHandler =
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(
contextHandler.getConnectionPeer(), authProviderFactory));
ChannelInboundHandler requestHandler = isRunning.get()
? new BookieRequestHandler(conf, requestProcessor, allChannels)
: new RejectRequestHandler();
pipeline.addLast("bookieRequestHandler", requestHandler);
pipeline.addLast("contextHandler", contextHandler);
}
});
LOG.info("Binding jvm bookie-rpc endpoint to {}", bookieId.toString());
// use the same address 'name', so clients can find local Bookie still discovering them using ZK
jvmBootstrap.bind(new LocalAddress(bookieId.toString())).sync();
LocalBookiesRegistry.registerLocalBookieAddress(bookieId);
}
}