in rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java [153:224]
private void initializeServer() throws Exception {
String clientId = livyConf.get(CLIENT_ID);
Utils.checkArgument(clientId != null, "No client ID provided.");
String secret = livyConf.get(CLIENT_SECRET);
Utils.checkArgument(secret != null, "No secret provided.");
String launcherAddress = livyConf.get(LAUNCHER_ADDRESS);
Utils.checkArgument(launcherAddress != null, "Missing launcher address.");
int launcherPort = livyConf.getInt(LAUNCHER_PORT);
Utils.checkArgument(launcherPort > 0, "Missing launcher port.");
LOG.info("Connecting to: {}:{}", launcherAddress, launcherPort);
// We need to unset this configuration since it doesn't really apply for the driver side.
// If the driver runs on a multi-homed machine, this can lead to issues where the Livy
// server cannot connect to the auto-detected address, but since the driver can run anywhere
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);
// If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option
// this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (conf.get("spark.master").startsWith("k8s")) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}
if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
LOG.info("Infinite looping as test flag TEST_STUCK_START_SESSION is turned on.");
while(true) {
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted during test sleep.", e);
}
}
}
// Bring up the RpcServer an register the secret provided by the Livy server as a client.
LOG.info("Starting RPC server...");
this.server = new RpcServer(livyConf);
server.registerClient(clientId, secret, new RpcServer.ClientCallback() {
@Override
public RpcDispatcher onNewClient(Rpc client) {
registerClient(client);
return RSCDriver.this;
}
@Override
public void onSaslComplete(Rpc client) {
onClientAuthenticated(client);
}
});
// The RPC library takes care of timing out this.
Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(),
launcherAddress, launcherPort, clientId, secret, this).get();
try {
callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get(
livyConf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Timed out sending address to Livy server, shutting down.");
throw te;
} finally {
callbackRpc.close();
}
// At this point we install the idle timeout handler, in case the Livy server fails to connect
// back.
setupIdleTimeout();
}