in rsc/src/main/java/org/apache/livy/rsc/RSCClient.java [111:150]
private synchronized void connectToContext(final ContextInfo info) throws Exception {
this.contextInfo = info;
try {
Promise<Rpc> promise = Rpc.createClient(conf,
eventLoopGroup,
info.remoteAddress,
info.remotePort,
info.clientId,
info.secret,
protocol);
Utils.addListener(promise, new FutureListener<Rpc>() {
@Override
public void onSuccess(Rpc rpc) throws Exception {
driverRpc.setSuccess(rpc);
Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() {
@Override
public void onSuccess(Void unused) {
if (isAlive) {
LOG.warn("Client RPC channel closed unexpectedly.");
try {
stop(false);
} catch (Exception e) { /* stop() itself prints warning. */ }
}
}
});
LOG.debug("Connected to context {} ({}, {}).", info.clientId,
rpc.getChannel(), executorGroupId);
}
@Override
public void onFailure(Throwable error) throws Exception {
driverRpc.setFailure(error);
connectionError(error);
}
});
} catch (Exception e) {
connectionError(e);
}
}