in rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java [92:151]
public static Promise<Rpc> createClient(
final RSCConf config,
final EventLoopGroup eloop,
String host,
int port,
final String clientId,
final String secret,
final RpcDispatcher dispatcher) throws Exception {
int connectTimeoutMs = (int) config.getTimeAsMs(RPC_CLIENT_CONNECT_TIMEOUT);
final ChannelFuture cf = new Bootstrap()
.group(eloop)
.handler(new ChannelInboundHandlerAdapter() { })
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.connect(host, port);
final Promise<Rpc> promise = eloop.next().newPromise();
final AtomicReference<Rpc> rpc = new AtomicReference<Rpc>();
// Set up a timeout to undo everything.
final Runnable timeoutTask = new Runnable() {
@Override
public void run() {
promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection."));
}
};
final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask,
config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
// The channel listener instantiates the Rpc instance when the connection is established,
// and initiates the SASL handshake.
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
if (cf.isSuccess()) {
SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise,
timeoutFuture, secret, dispatcher);
Rpc rpc = createRpc(config, saslHandler, (SocketChannel) cf.channel(), eloop);
saslHandler.rpc = rpc;
saslHandler.sendHello(cf.channel());
} else {
promise.setFailure(cf.cause());
}
}
});
// Handle cancellation of the promise.
promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
@Override
public void operationComplete(Promise<Rpc> p) {
if (p.isCancelled()) {
cf.cancel(true);
}
}
});
return promise;
}