private void initializeServer()

in rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java [153:224]


  private void initializeServer() throws Exception {
    String clientId = livyConf.get(CLIENT_ID);
    Utils.checkArgument(clientId != null, "No client ID provided.");
    String secret = livyConf.get(CLIENT_SECRET);
    Utils.checkArgument(secret != null, "No secret provided.");

    String launcherAddress = livyConf.get(LAUNCHER_ADDRESS);
    Utils.checkArgument(launcherAddress != null, "Missing launcher address.");
    int launcherPort = livyConf.getInt(LAUNCHER_PORT);
    Utils.checkArgument(launcherPort > 0, "Missing launcher port.");

    LOG.info("Connecting to: {}:{}", launcherAddress, launcherPort);

    // We need to unset this configuration since it doesn't really apply for the driver side.
    // If the driver runs on a multi-homed machine, this can lead to issues where the Livy
    // server cannot connect to the auto-detected address, but since the driver can run anywhere
    // on the cluster, it would be tricky to solve that problem in a generic way.
    livyConf.set(RPC_SERVER_ADDRESS, null);

    // If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option
    // this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
    // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
    if (conf.get("spark.master").startsWith("k8s")) {
      livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
    }

    if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
      // Test flag is turned on so we will just infinite loop here. It should cause
      // timeout and we should still see yarn application being cleaned up.
      LOG.info("Infinite looping as test flag TEST_STUCK_START_SESSION is turned on.");
      while(true) {
        try {
          TimeUnit.MINUTES.sleep(10);
        } catch (InterruptedException e) {
          LOG.warn("Interrupted during test sleep.", e);
        }
      }
    }

    // Bring up the RpcServer an register the secret provided by the Livy server as a client.
    LOG.info("Starting RPC server...");
    this.server = new RpcServer(livyConf);
    server.registerClient(clientId, secret, new RpcServer.ClientCallback() {
      @Override
      public RpcDispatcher onNewClient(Rpc client) {
        registerClient(client);
        return RSCDriver.this;
      }

      @Override
      public void onSaslComplete(Rpc client) {
        onClientAuthenticated(client);
      }
    });

    // The RPC library takes care of timing out this.
    Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(),
      launcherAddress, launcherPort, clientId, secret, this).get();
    try {
      callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get(
        livyConf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
    } catch (TimeoutException te) {
      LOG.warn("Timed out sending address to Livy server, shutting down.");
      throw te;
    } finally {
      callbackRpc.close();
    }

    // At this point we install the idle timeout handler, in case the Livy server fails to connect
    // back.
    setupIdleTimeout();
  }