in zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java [62:267]
public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException {
Map<String, String> env = super.buildEnvFromProperties(context);
Properties sparkProperties = new Properties();
String spMaster = getSparkMaster(context);
if (spMaster != null) {
sparkProperties.put(SPARK_MASTER_KEY, spMaster);
}
Properties properties = context.getProperties();
for (String key : properties.stringPropertyNames()) {
String propValue = properties.getProperty(key);
if (RemoteInterpreterUtils.isEnvString(key) && !StringUtils.isBlank(propValue)) {
env.put(key, propValue);
}
if (isSparkConf(key, propValue)) {
// There is already initial value following --driver-java-options when SparkInterpreter launches.
// Values in sparkProperties would be added by --conf,
// and --conf spark.driver.extraJavaOptions would conflict with --driver-java-options.
// Therefore we add values of spark.driver.extraJavaOptions following --driver-java-options
// instead of into sparkProperties.
if (Objects.equals("spark.driver.extraJavaOptions", key)) {
env.put("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF", (String) properties.remove(key));
continue;
}
sparkProperties.setProperty(key, propValue);
}
}
// set spark.app.name if it is not set or empty
if (!sparkProperties.containsKey("spark.app.name") ||
StringUtils.isBlank(sparkProperties.getProperty("spark.app.name"))) {
sparkProperties.setProperty("spark.app.name", context.getInterpreterGroupId());
}
setupPropertiesForPySpark(sparkProperties, context);
setupPropertiesForSparkR(sparkProperties, context);
String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
if (StringUtils.isNotBlank(condaEnvName)) {
if (!isYarnCluster(context)) {
throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode");
}
sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python");
}
if (isYarnCluster(context)) {
env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
// Need to set `zeppelin.interpreter.forceShutdown` in interpreter properties directly
// instead of updating sparkProperties.
// Because `zeppelin.interpreter.forceShutdown` is initialized in RemoteInterpreterServer
// before SparkInterpreter is created.
context.getProperties().put("zeppelin.interpreter.forceShutdown", "false");
} else if (zConf.isOnlyYarnCluster()){
throw new IOException("Only yarn-cluster mode is allowed, please set " +
ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() +
" to false if you want to use other modes.");
}
if (isYarnMode(context) && getDeployMode(context).equals("cluster")) {
if (sparkProperties.containsKey("spark.files")) {
sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," +
zConf.getConfDir() + "/log4j_yarn_cluster.properties");
} else {
sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties");
}
sparkProperties.put("spark.yarn.maxAppAttempts", "1");
}
String scalaVersion = null;
try {
String sparkHome = getEnv("SPARK_HOME", context);
LOGGER.info("SPARK_HOME: {}", sparkHome);
scalaVersion = detectSparkScalaVersion(sparkHome, env);
LOGGER.info("Scala version for Spark: {}", scalaVersion);
context.getProperties().put("zeppelin.spark.scala.version", scalaVersion);
} catch (Exception e) {
throw new IOException("Fail to detect scala version, the reason is:"+ e.getMessage());
}
if (isYarnMode(context)
&& getDeployMode(context).equals("cluster")) {
try {
List<String> additionalJars = new ArrayList<>();
Path localRepoPath =
Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
if (Files.exists(localRepoPath) && Files.isDirectory(localRepoPath)) {
try (DirectoryStream<Path> localRepoStream = Files.newDirectoryStream(localRepoPath, Files::isRegularFile)) {
List<String> localRepoJars = StreamSupport.stream(localRepoStream.spliterator(),
false)
.map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
additionalJars.addAll(localRepoJars);
}
}
Path scalaFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter/spark/scala-" + scalaVersion);
if (!scalaFolder.toFile().exists()) {
throw new IOException("spark scala folder " + scalaFolder.toFile() + " doesn't exist");
}
try (DirectoryStream<Path> scalaStream = Files.newDirectoryStream(scalaFolder, Files::isRegularFile)) {
List<String> scalaJars = StreamSupport.stream(scalaStream.spliterator(),
false)
.map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
additionalJars.addAll(scalaJars);
}
// add zeppelin-interpreter-shaded
Path interpreterFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter");
try (DirectoryStream<Path> interpreterStream = Files.newDirectoryStream(interpreterFolder, Files::isRegularFile)) {
List<String> interpreterJars = StreamSupport.stream(interpreterStream.spliterator(),
false)
.filter(jar -> jar.toFile().getName().startsWith("zeppelin-interpreter-shaded")
&& jar.toFile().getName().endsWith(".jar"))
.map(jar -> jar.toAbsolutePath().toString())
.collect(Collectors.toList());
if (interpreterJars.isEmpty()) {
throw new IOException("zeppelin-interpreter-shaded jar is not found");
} else if (interpreterJars.size() > 1) {
throw new IOException("more than 1 zeppelin-interpreter-shaded jars are found: "
+ StringUtils.join(interpreterJars, ","));
}
additionalJars.addAll(interpreterJars);
}
if (sparkProperties.containsKey("spark.jars")) {
sparkProperties.put("spark.jars", sparkProperties.getProperty("spark.jars") + "," +
StringUtils.join(additionalJars, ","));
} else {
sparkProperties.put("spark.jars", StringUtils.join(additionalJars, ","));
}
} catch (Exception e) {
throw new IOException("Fail to set additional jars for spark interpreter", e);
}
}
StringJoiner sparkConfSJ = new StringJoiner("|");
if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
sparkConfSJ.add("--proxy-user");
sparkConfSJ.add(context.getUserName());
sparkProperties.remove("spark.yarn.keytab");
sparkProperties.remove("spark.yarn.principal");
}
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfSJ.add("--conf");
sparkConfSJ.add(name + "=" + sparkProperties.getProperty(name) + "");
}
env.put("ZEPPELIN_SPARK_CONF", sparkConfSJ.toString());
// set these env in the order of
// 1. interpreter-setting
// 2. zeppelin-env.sh
// It is encouraged to set env in interpreter setting, but just for backward compatibility,
// we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
String envValue = getEnv(envName, context);
if (!StringUtils.isBlank(envValue)) {
env.put(envName, envValue);
}
}
String keytab = properties.getProperty("spark.yarn.keytab",
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB));
String principal = properties.getProperty("spark.yarn.principal",
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL));
if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
LOGGER.info("Run Spark under secure mode with keytab: {}, principal: {}",keytab, principal);
} else {
LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified");
}
env.put("PYSPARK_PIN_THREAD", "true");
// ZEPPELIN_INTP_CLASSPATH
String sparkConfDir = getEnv("SPARK_CONF_DIR", context);
if (StringUtils.isBlank(sparkConfDir)) {
String sparkHome = getEnv("SPARK_HOME", context);
sparkConfDir = sparkHome + "/conf";
}
Properties sparkDefaultProperties = new Properties();
File sparkDefaultFile = new File(sparkConfDir, "spark-defaults.conf");
if (sparkDefaultFile.exists()) {
sparkDefaultProperties.load(new FileInputStream(sparkDefaultFile));
String driverExtraClassPath = sparkDefaultProperties.getProperty("spark.driver.extraClassPath");
if (!StringUtils.isBlank(driverExtraClassPath)) {
env.put("ZEPPELIN_INTP_CLASSPATH", driverExtraClassPath);
}
} else {
LOGGER.warn("spark-defaults.conf doesn't exist: {}", sparkDefaultFile.getAbsolutePath());
}
if (isYarnMode(context)) {
boolean runAsLoginUser = Boolean.parseBoolean(context
.getProperties()
.getProperty("zeppelin.spark.run.asLoginUser", "true"));
String userName = context.getUserName();
if (runAsLoginUser && !"anonymous".equals(userName)) {
env.put("HADOOP_USER_NAME", userName);
}
}
LOGGER.info("buildEnvFromProperties: {}", env);
return env;
}