private TSOClient()

in transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java [126:198]


    private TSOClient(OmidClientConfiguration omidConf) throws IOException {

        requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
        requestMaxRetries = omidConf.getRequestMaxRetries();
        tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();

        LOG.info("Connecting to TSO...");
        HostAndPort hp;
        switch (omidConf.getConnectionType()) {
            case HA:
                zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
                                                omidConf.getZkNamespace(),
                                                omidConf.getZkConnectionTimeoutInSecs());
                zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
                configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
                String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
                // TSO info includes the new TSO host:port address and epoch
                String[] currentTSOAndEpochArray = tsoInfo.split("#");
                hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
                setTSOAddress(hp.getHost(), hp.getPort());
                epoch = Long.parseLong(currentTSOAndEpochArray[1]);
                LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
                break;
            case DIRECT:
            default:
                hp = HostAndPort.fromString(omidConf.getConnectionString());
                setTSOAddress(hp.getHost(), hp.getPort());
                LOG.info("\t* TSO host:port {} will be connected directly", hp);
                break;
        }

        fsmExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
        fsm = new StateMachine.FsmImpl(fsmExecutor);
        fsm.setInitState(new DisconnectedState(fsm));

        // Start client with the configured thread count
        int tsoExecutorThreads = omidConf.getExecutorThreads();
        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
        EventLoopGroup workerGroup = new NioEventLoopGroup(tsoExecutorThreads, workerThreadFactory);

        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                if (omidConf.getTlsEnabled()){
                    SslContext sslContext = getSslContext(omidConf);
                    SslHandler sslHandler = sslContext.newHandler(channel.alloc(), hp.getHost(), hp.getPort());
                    sslHandler.setHandshakeTimeoutMillis(omidConf.getClientNettyTlsHandshakeTimeout());
                    channel.pipeline().addFirst(sslHandler);
                    LOG.info("SSL handler added with handshake timeout {} ms",
                            sslHandler.getHandshakeTimeoutMillis());
                }
                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
                pipeline.addLast("protobufencoder", new ProtobufEncoder());
                pipeline.addLast("inboundHandler", new Handler(fsm));

            }
        });
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);

        lowLatency = false;
        conflictDetectionLevel = omidConf.getConflictAnalysisLevel();

    }