in transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java [126:198]
private TSOClient(OmidClientConfiguration omidConf) throws IOException {
requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
requestMaxRetries = omidConf.getRequestMaxRetries();
tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
LOG.info("Connecting to TSO...");
HostAndPort hp;
switch (omidConf.getConnectionType()) {
case HA:
zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
omidConf.getZkNamespace(),
omidConf.getZkConnectionTimeoutInSecs());
zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
// TSO info includes the new TSO host:port address and epoch
String[] currentTSOAndEpochArray = tsoInfo.split("#");
hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
setTSOAddress(hp.getHost(), hp.getPort());
epoch = Long.parseLong(currentTSOAndEpochArray[1]);
LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
break;
case DIRECT:
default:
hp = HostAndPort.fromString(omidConf.getConnectionString());
setTSOAddress(hp.getHost(), hp.getPort());
LOG.info("\t* TSO host:port {} will be connected directly", hp);
break;
}
fsmExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
fsm = new StateMachine.FsmImpl(fsmExecutor);
fsm.setInitState(new DisconnectedState(fsm));
// Start client with the configured thread count
int tsoExecutorThreads = omidConf.getExecutorThreads();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
EventLoopGroup workerGroup = new NioEventLoopGroup(tsoExecutorThreads, workerThreadFactory);
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (omidConf.getTlsEnabled()){
SslContext sslContext = getSslContext(omidConf);
SslHandler sslHandler = sslContext.newHandler(channel.alloc(), hp.getHost(), hp.getPort());
sslHandler.setHandshakeTimeoutMillis(omidConf.getClientNettyTlsHandshakeTimeout());
channel.pipeline().addFirst(sslHandler);
LOG.info("SSL handler added with handshake timeout {} ms",
sslHandler.getHandshakeTimeoutMillis());
}
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
pipeline.addLast("inboundHandler", new Handler(fsm));
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
lowLatency = false;
conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
}