public static Promise createClient()

in rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java [92:151]


  public static Promise<Rpc> createClient(
      final RSCConf config,
      final EventLoopGroup eloop,
      String host,
      int port,
      final String clientId,
      final String secret,
      final RpcDispatcher dispatcher) throws Exception {
    int connectTimeoutMs = (int) config.getTimeAsMs(RPC_CLIENT_CONNECT_TIMEOUT);

    final ChannelFuture cf = new Bootstrap()
        .group(eloop)
        .handler(new ChannelInboundHandlerAdapter() { })
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
        .connect(host, port);

    final Promise<Rpc> promise = eloop.next().newPromise();
    final AtomicReference<Rpc> rpc = new AtomicReference<Rpc>();

    // Set up a timeout to undo everything.
    final Runnable timeoutTask = new Runnable() {
      @Override
      public void run() {
        promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection."));
      }
    };
    final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask,
        config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);

    // The channel listener instantiates the Rpc instance when the connection is established,
    // and initiates the SASL handshake.
    cf.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture cf) throws Exception {
        if (cf.isSuccess()) {
          SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise,
            timeoutFuture, secret, dispatcher);
          Rpc rpc = createRpc(config, saslHandler, (SocketChannel) cf.channel(), eloop);
          saslHandler.rpc = rpc;
          saslHandler.sendHello(cf.channel());
        } else {
          promise.setFailure(cf.cause());
        }
      }
    });

    // Handle cancellation of the promise.
    promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
      @Override
      public void operationComplete(Promise<Rpc> p) {
        if (p.isCancelled()) {
          cf.cancel(true);
        }
      }
    });

    return promise;
  }