private[interactive] def prepareBuilderProp()

in server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala [199:407]


  private[interactive] def prepareBuilderProp(
    conf: Map[String, String],
    kind: Kind,
    livyConf: LivyConf): mutable.Map[String, String] = {

    val builderProperties = mutable.Map[String, String]()
    builderProperties ++= conf

    def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
      Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
        val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
        jars.split(",").filter { name => new Path(name).getName match {
            // Filter out unmatched scala jars
            case regex(ver) => ver == scalaVersion
            // Keep all the java jars end with ".jar"
            case _ => name.endsWith(".jar")
          }
        }.toList
      }.getOrElse {
        val home = sys.env("LIVY_HOME")
        val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
          .filter(_.isDirectory())
          .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars"))
        require(jars.isDirectory(), "Cannot find Livy REPL jars.")
        jars.listFiles().map(_.getAbsolutePath()).toList
      }
    }

    def findSparkRArchive(): Option[String] = {
      Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
        sys.env.get("SPARK_HOME").flatMap { case sparkHome =>
          val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
          val rArchivesFile = new File(path)
          if (rArchivesFile.exists()) {
            Some(rArchivesFile.getAbsolutePath)
          } else {
            warn("sparkr.zip not found; cannot start R interpreter.")
            None
          }
        }
      }
    }

    def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = {
      if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) {
        // datanucleus jars has already been in classpath in integration test
        Seq.empty
      } else {
        val sparkHome = livyConf.sparkHome().get
        val libdir = sparkMajorVersion match {
          case 2 | 3 =>
            if (new File(sparkHome, "RELEASE").isFile) {
              new File(sparkHome, "jars")
            } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
              new File(sparkHome, "assembly/target/scala-2.11/jars")
            } else {
              new File(sparkHome, "assembly/target/scala-2.12/jars")
            }
          case v =>
            throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion")
        }
        val jars = if (!libdir.isDirectory) {
          Seq.empty[String]
        } else {
          libdir.listFiles().filter(_.getName.startsWith("datanucleus-"))
            .map(_.getAbsolutePath).toSeq
        }
        if (jars.isEmpty) {
          warn("datanucleus jars can not be found")
        }
        jars
      }
    }

    /**
     * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf)
     * 1. First look for hive-site.xml in user request
     * 2. Then look for that under classpath
     * @param livyConf
     * @return  (hive-site.xml path, whether it is provided by user)
     */
    def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = {
      if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) {
        (None, true)
      } else {
        val hiveSiteURL = getClass.getResource("/hive-site.xml")
        if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") {
          (Some(new File(hiveSiteURL.toURI)), false)
        } else {
          (None, false)
        }
      }
    }

    def findPySparkArchives(): Seq[String] = {
      Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES))
        .map(_.split(",").toSeq)
        .getOrElse {
          sys.env.get("SPARK_HOME") .map { case sparkHome =>
            val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
            val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
            var py4jZip: DirectoryStream[java.nio.file.Path] = null;
            var py4jFile: File = null;
            try {
              py4jZip = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
              py4jFile = py4jZip.iterator()
                .next()
                .toFile
            }
            finally {
              if (py4jZip != null) {
                py4jZip.close()
              }
            }
            if (!pyArchivesFile.exists()) {
              warn("pyspark.zip not found; cannot start pyspark interpreter.")
              Seq.empty
            } else if (!py4jFile.exists()) {
              warn("py4j-*-src.zip not found; can start pyspark interpreter.")
              Seq.empty
            } else {
              Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
            }
          }.getOrElse(Seq())
        }
    }

    def mergeConfList(list: Seq[String], key: String): Unit = {
      if (list.nonEmpty) {
        builderProperties.get(key) match {
          case None =>
            builderProperties.put(key, list.mkString(","))
          case Some(oldList) =>
            val newList = (oldList :: list.toList).mkString(",")
            builderProperties.put(key, newList)
        }
      }
    }

    def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
      val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
      hiveSiteFile(sparkFiles, livyConf) match {
        case (_, true) =>
          debug("Enable HiveContext because hive-site.xml is found in user request.")
          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
        case (Some(file), false) =>
          debug("Enable HiveContext because hive-site.xml is found under classpath, "
            + file.getAbsolutePath)
          mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES)
          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
        case (None, false) =>
          warn("Enable HiveContext but no hive-site.xml found under" +
            " classpath or user request.")
      }
    }

    val pySparkFiles = if (!LivyConf.TEST_MODE) {
      findPySparkArchives()
    } else {
      Nil
    }

    if (pySparkFiles.nonEmpty) {
      builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
    }

    mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)

    val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
    sparkRArchive.foreach { archive =>
      builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
    }

    builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)

    // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
    Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
      builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))

    require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
    require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)

    val (sparkMajorVersion, _) =
      LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
    val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)

    mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
    val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
    // pass spark.livy.spark_major_version to driver
    builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)

    val confVal = if (enableHiveContext) "hive" else "in-memory"
    builderProperties.put("spark.sql.catalogImplementation", confVal)

    if (enableHiveContext) {
      mergeHiveSiteAndHiveDeps(sparkMajorVersion)
    }

    // Pick all the RSC-specific configs that have not been explicitly set otherwise, and
    // put them in the resulting properties, so that the remote driver can use them.
    livyConf.iterator().asScala.foreach { e =>
      val (key, value) = (e.getKey(), e.getValue())
      if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) {
        builderProperties(key) = value
      }
    }

    builderProperties
  }