in rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java [88:138]
private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOException {
this.promise = factory.getServer().getEventLoopGroup().next().newPromise();
this.clientId = UUID.randomUUID().toString();
this.secret = factory.getServer().createSecret();
this.conf = conf;
this.factory = factory;
final RegistrationHandler handler = new RegistrationHandler();
try {
factory.getServer().registerClient(clientId, secret, handler);
String replMode = conf.get("repl");
boolean repl = replMode != null && replMode.equals("true");
// In some scenarios the user may need to configure this endpoint setting explicitly.
String address = conf.get(LAUNCHER_ADDRESS);
// If not specified, use the RPC server address; otherwise use the specified address.
if (address == null || address.trim().isEmpty()) {
address = factory.getServer().getAddress();
}
conf.set(LAUNCHER_ADDRESS, address);
conf.set(LAUNCHER_PORT, factory.getServer().getPort());
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);
Utils.addListener(promise, new FutureListener<ContextInfo>() {
@Override
public void onFailure(Throwable error) throws Exception {
// If promise is cancelled or failed, make sure spark-submit is not leaked.
if (child != null) {
child.kill();
}
}
});
this.child = startDriver(conf, promise);
// Set up a timeout to fail the promise if we don't hear back from the context
// after a configurable timeout.
Runnable timeoutTask = new Runnable() {
@Override
public void run() {
connectTimeout(handler);
}
};
this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (Exception e) {
dispose(true);
throw Utils.propagate(e);
}
}