private static ChildProcess startDriver()

in rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java [162:253]


  private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
      throws IOException {
    String livyJars = conf.get(LIVY_JARS);
    if (livyJars == null) {
      String livyHome = System.getenv("LIVY_HOME");
      Utils.checkState(livyHome != null,
        "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
      File rscJars = new File(livyHome, "rsc-jars");
      List<File> allJars = new ArrayList<>();
      if (!rscJars.isDirectory()) {
        rscJars = new File(livyHome, "rsc/target/jars");

        // To ease development, also add the thriftserver's session jars to the Spark app.
        // On a release package, these jars should have been packaged in the proper "rsc-jars"
        // directory.
        File tsJars = new File(livyHome, "thriftserver/session/target/jars");
        if (tsJars.isDirectory()) {
          allJars.add(tsJars);
        }
      }

      Utils.checkState(rscJars.isDirectory(),
        "Cannot find rsc jars directory under LIVY_HOME.");
      allJars.add(rscJars);

      List<String> jars = new ArrayList<>();
      for (File dir : allJars) {
        for (File f : dir.listFiles()) {
           jars.add(f.getAbsolutePath());
        }
      }
      livyJars = Utils.join(jars, ",");
    }
    merge(conf, SPARK_JARS_KEY, livyJars, ",");

    merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
    merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");

    // Disable multiple attempts since the RPC server doesn't yet support multiple
    // connections for the same registered app.
    conf.set("spark.yarn.maxAppAttempts", "1");

    // Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots
    // of "small" Java processes lingering on the Livy server node.
    conf.set("spark.yarn.submit.waitAppCompletion", "false");

    if (!conf.getBoolean(CLIENT_IN_PROCESS) &&
        // For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec isn't dumped properly.
        // Disable JaCoCo for this case.
        !conf.getBoolean(TEST_NO_CODE_COVERAGE_ANALYSIS)) {
      // For testing; propagate jacoco settings so that we also do coverage analysis
      // on the launched driver. We replace the name of the main file ("main.exec")
      // so that we don't end up fighting with the main test launcher.
      String jacocoArgs = TestUtils.getJacocoArgs();
      if (jacocoArgs != null) {
        merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
      }
    }

    final File confFile = writeConfToFile(conf);

    if (ContextLauncher.mockSparkSubmit != null) {
      LOG.warn("!!!! Using mock spark-submit. !!!!");
      return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, confFile);
    } else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
      // Mostly for testing things quickly. Do not do this in production.
      LOG.warn("!!!! Running remote driver in-process. !!!!");
      Runnable child = new Runnable() {
        @Override
        public void run() {
          try {
            RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
          } catch (Exception e) {
            throw Utils.propagate(e);
          }
        }
      };
      return new ChildProcess(conf, promise, child, confFile);
    } else {
      final SparkLauncher launcher = new SparkLauncher();
      launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
      launcher.setAppResource(SparkLauncher.NO_RESOURCE);
      launcher.setPropertiesFile(confFile.getAbsolutePath());
      launcher.setMainClass(RSCDriverBootstrapper.class.getName());

      if (conf.get(PROXY_USER) != null) {
        launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
      }

      return new ChildProcess(conf, promise, launcher.launch(), confFile);
    }
  }