in tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java [88:122]
public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
this.config = config;
this.metrics = metrics;
this.requestProcessor = requestProcessor;
// Setup netty listener
int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
this.bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// Max packet length is 10MB. Transactions with so many cells
// that the packet is rejected will receive a ServiceUnavailableException.
// 10MB is enough for 2 million cells in a transaction though.
if (config.getTlsEnabled())
{
initSSL(pipeline, config.getSupportPlainText());
}
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
pipeline.addLast("handler", TSOChannelHandler.this);
}
});
}