public Map buildEnvFromProperties()

in zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java [62:267]


  public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException {
    Map<String, String> env = super.buildEnvFromProperties(context);
    Properties sparkProperties = new Properties();
    String spMaster = getSparkMaster(context);
    if (spMaster != null) {
      sparkProperties.put(SPARK_MASTER_KEY, spMaster);
    }
    Properties properties = context.getProperties();
    for (String key : properties.stringPropertyNames()) {
      String propValue = properties.getProperty(key);
      if (RemoteInterpreterUtils.isEnvString(key) && !StringUtils.isBlank(propValue)) {
        env.put(key, propValue);
      }
      if (isSparkConf(key, propValue)) {
        // There is already initial value following --driver-java-options when SparkInterpreter launches.
        // Values in sparkProperties would be added by --conf,
        // and --conf spark.driver.extraJavaOptions would conflict with --driver-java-options.
        // Therefore we add values of spark.driver.extraJavaOptions following --driver-java-options
        // instead of into sparkProperties.
        if (Objects.equals("spark.driver.extraJavaOptions", key)) {
          env.put("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF", (String) properties.remove(key));
          continue;
        }
        sparkProperties.setProperty(key, propValue);
      }
    }

    // set spark.app.name if it is not set or empty
    if (!sparkProperties.containsKey("spark.app.name") ||
            StringUtils.isBlank(sparkProperties.getProperty("spark.app.name"))) {
      sparkProperties.setProperty("spark.app.name", context.getInterpreterGroupId());
    }

    setupPropertiesForPySpark(sparkProperties, context);
    setupPropertiesForSparkR(sparkProperties, context);

    String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
    if (StringUtils.isNotBlank(condaEnvName)) {
      if (!isYarnCluster(context)) {
        throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode");
      }
      sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python");
    }

    if (isYarnCluster(context)) {
      env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
      sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
      // Need to set `zeppelin.interpreter.forceShutdown` in interpreter properties directly
      // instead of updating sparkProperties.
      // Because `zeppelin.interpreter.forceShutdown` is initialized in RemoteInterpreterServer
      // before SparkInterpreter is created.
      context.getProperties().put("zeppelin.interpreter.forceShutdown", "false");
    } else if (zConf.isOnlyYarnCluster()){
      throw new IOException("Only yarn-cluster mode is allowed, please set " +
              ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() +
              " to false if you want to use other modes.");
    }

    if (isYarnMode(context) && getDeployMode(context).equals("cluster")) {
      if (sparkProperties.containsKey("spark.files")) {
        sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," +
            zConf.getConfDir() + "/log4j_yarn_cluster.properties");
      } else {
        sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties");
      }
      sparkProperties.put("spark.yarn.maxAppAttempts", "1");
    }


    String scalaVersion = null;
    try {
      String sparkHome = getEnv("SPARK_HOME", context);
      LOGGER.info("SPARK_HOME: {}", sparkHome);
      scalaVersion = detectSparkScalaVersion(sparkHome, env);
      LOGGER.info("Scala version for Spark: {}", scalaVersion);
      context.getProperties().put("zeppelin.spark.scala.version", scalaVersion);
    } catch (Exception e) {
      throw new IOException("Fail to detect scala version, the reason is:"+ e.getMessage());
    }

    if (isYarnMode(context)
      && getDeployMode(context).equals("cluster")) {
      try {
        List<String> additionalJars = new ArrayList<>();
        Path localRepoPath =
                Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
        if (Files.exists(localRepoPath) && Files.isDirectory(localRepoPath)) {
          try (DirectoryStream<Path> localRepoStream = Files.newDirectoryStream(localRepoPath, Files::isRegularFile)) {
            List<String> localRepoJars = StreamSupport.stream(localRepoStream.spliterator(),
                  false)
                  .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
            additionalJars.addAll(localRepoJars);
          }
        }

        Path scalaFolder =  Paths.get(zConf.getZeppelinHome(), "/interpreter/spark/scala-" + scalaVersion);
        if (!scalaFolder.toFile().exists()) {
          throw new IOException("spark scala folder " + scalaFolder.toFile() + " doesn't exist");
        }
        try (DirectoryStream<Path> scalaStream = Files.newDirectoryStream(scalaFolder, Files::isRegularFile)) {
          List<String> scalaJars = StreamSupport.stream(scalaStream.spliterator(),
                false)
                .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
          additionalJars.addAll(scalaJars);
        }
        // add zeppelin-interpreter-shaded
        Path interpreterFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter");
        try (DirectoryStream<Path> interpreterStream = Files.newDirectoryStream(interpreterFolder, Files::isRegularFile)) {
          List<String> interpreterJars = StreamSupport.stream(interpreterStream.spliterator(),
                false)
                .filter(jar -> jar.toFile().getName().startsWith("zeppelin-interpreter-shaded")
                        && jar.toFile().getName().endsWith(".jar"))
                .map(jar -> jar.toAbsolutePath().toString())
                .collect(Collectors.toList());
          if (interpreterJars.isEmpty()) {
            throw new IOException("zeppelin-interpreter-shaded jar is not found");
          } else if (interpreterJars.size() > 1) {
            throw new IOException("more than 1 zeppelin-interpreter-shaded jars are found: "
                    + StringUtils.join(interpreterJars, ","));
          }
          additionalJars.addAll(interpreterJars);
        }

        if (sparkProperties.containsKey("spark.jars")) {
          sparkProperties.put("spark.jars", sparkProperties.getProperty("spark.jars") + "," +
                  StringUtils.join(additionalJars, ","));
        } else {
          sparkProperties.put("spark.jars", StringUtils.join(additionalJars, ","));
        }
      } catch (Exception e) {
        throw new IOException("Fail to set additional jars for spark interpreter", e);
      }
    }

    StringJoiner sparkConfSJ = new StringJoiner("|");
    if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
      sparkConfSJ.add("--proxy-user");
      sparkConfSJ.add(context.getUserName());
      sparkProperties.remove("spark.yarn.keytab");
      sparkProperties.remove("spark.yarn.principal");
    }

    for (String name : sparkProperties.stringPropertyNames()) {
      sparkConfSJ.add("--conf");
      sparkConfSJ.add(name + "=" + sparkProperties.getProperty(name) + "");
    }

    env.put("ZEPPELIN_SPARK_CONF", sparkConfSJ.toString());

    // set these env in the order of
    // 1. interpreter-setting
    // 2. zeppelin-env.sh
    // It is encouraged to set env in interpreter setting, but just for backward compatibility,
    // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
    for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"})  {
      String envValue = getEnv(envName, context);
      if (!StringUtils.isBlank(envValue)) {
        env.put(envName, envValue);
      }
    }

    String keytab = properties.getProperty("spark.yarn.keytab",
            zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB));
    String principal = properties.getProperty("spark.yarn.principal",
            zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL));

    if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
      env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
      env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
      LOGGER.info("Run Spark under secure mode with keytab: {}, principal: {}",keytab, principal);
    } else {
      LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified");
    }

    env.put("PYSPARK_PIN_THREAD", "true");

    // ZEPPELIN_INTP_CLASSPATH
    String sparkConfDir = getEnv("SPARK_CONF_DIR", context);
    if (StringUtils.isBlank(sparkConfDir)) {
      String sparkHome = getEnv("SPARK_HOME", context);
      sparkConfDir = sparkHome + "/conf";
    }
    Properties sparkDefaultProperties = new Properties();
    File sparkDefaultFile = new File(sparkConfDir, "spark-defaults.conf");
    if (sparkDefaultFile.exists()) {
      sparkDefaultProperties.load(new FileInputStream(sparkDefaultFile));
      String driverExtraClassPath = sparkDefaultProperties.getProperty("spark.driver.extraClassPath");
      if (!StringUtils.isBlank(driverExtraClassPath)) {
        env.put("ZEPPELIN_INTP_CLASSPATH", driverExtraClassPath);
      }
    } else {
      LOGGER.warn("spark-defaults.conf doesn't exist: {}", sparkDefaultFile.getAbsolutePath());
    }

    if (isYarnMode(context)) {
      boolean runAsLoginUser = Boolean.parseBoolean(context
              .getProperties()
              .getProperty("zeppelin.spark.run.asLoginUser", "true"));
      String userName = context.getUserName();
      if (runAsLoginUser && !"anonymous".equals(userName)) {
        env.put("HADOOP_USER_NAME", userName);
      }
    }
    LOGGER.info("buildEnvFromProperties: {}", env);
    return env;
  }