in rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java [162:253]
private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
throws IOException {
String livyJars = conf.get(LIVY_JARS);
if (livyJars == null) {
String livyHome = System.getenv("LIVY_HOME");
Utils.checkState(livyHome != null,
"Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
File rscJars = new File(livyHome, "rsc-jars");
List<File> allJars = new ArrayList<>();
if (!rscJars.isDirectory()) {
rscJars = new File(livyHome, "rsc/target/jars");
// To ease development, also add the thriftserver's session jars to the Spark app.
// On a release package, these jars should have been packaged in the proper "rsc-jars"
// directory.
File tsJars = new File(livyHome, "thriftserver/session/target/jars");
if (tsJars.isDirectory()) {
allJars.add(tsJars);
}
}
Utils.checkState(rscJars.isDirectory(),
"Cannot find rsc jars directory under LIVY_HOME.");
allJars.add(rscJars);
List<String> jars = new ArrayList<>();
for (File dir : allJars) {
for (File f : dir.listFiles()) {
jars.add(f.getAbsolutePath());
}
}
livyJars = Utils.join(jars, ",");
}
merge(conf, SPARK_JARS_KEY, livyJars, ",");
merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
// Disable multiple attempts since the RPC server doesn't yet support multiple
// connections for the same registered app.
conf.set("spark.yarn.maxAppAttempts", "1");
// Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots
// of "small" Java processes lingering on the Livy server node.
conf.set("spark.yarn.submit.waitAppCompletion", "false");
if (!conf.getBoolean(CLIENT_IN_PROCESS) &&
// For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec isn't dumped properly.
// Disable JaCoCo for this case.
!conf.getBoolean(TEST_NO_CODE_COVERAGE_ANALYSIS)) {
// For testing; propagate jacoco settings so that we also do coverage analysis
// on the launched driver. We replace the name of the main file ("main.exec")
// so that we don't end up fighting with the main test launcher.
String jacocoArgs = TestUtils.getJacocoArgs();
if (jacocoArgs != null) {
merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " ");
}
}
final File confFile = writeConfToFile(conf);
if (ContextLauncher.mockSparkSubmit != null) {
LOG.warn("!!!! Using mock spark-submit. !!!!");
return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, confFile);
} else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
// Mostly for testing things quickly. Do not do this in production.
LOG.warn("!!!! Running remote driver in-process. !!!!");
Runnable child = new Runnable() {
@Override
public void run() {
try {
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
throw Utils.propagate(e);
}
}
};
return new ChildProcess(conf, promise, child, confFile);
} else {
final SparkLauncher launcher = new SparkLauncher();
launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
launcher.setAppResource(SparkLauncher.NO_RESOURCE);
launcher.setPropertiesFile(confFile.getAbsolutePath());
launcher.setMainClass(RSCDriverBootstrapper.class.getName());
if (conf.get(PROXY_USER) != null) {
launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
}
return new ChildProcess(conf, promise, launcher.launch(), confFile);
}
}