in server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala [199:407]
private[interactive] def prepareBuilderProp(
conf: Map[String, String],
kind: Kind,
livyConf: LivyConf): mutable.Map[String, String] = {
val builderProperties = mutable.Map[String, String]()
builderProperties ++= conf
def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
jars.split(",").filter { name => new Path(name).getName match {
// Filter out unmatched scala jars
case regex(ver) => ver == scalaVersion
// Keep all the java jars end with ".jar"
case _ => name.endsWith(".jar")
}
}.toList
}.getOrElse {
val home = sys.env("LIVY_HOME")
val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
.filter(_.isDirectory())
.getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars"))
require(jars.isDirectory(), "Cannot find Livy REPL jars.")
jars.listFiles().map(_.getAbsolutePath()).toList
}
}
def findSparkRArchive(): Option[String] = {
Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
sys.env.get("SPARK_HOME").flatMap { case sparkHome =>
val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
val rArchivesFile = new File(path)
if (rArchivesFile.exists()) {
Some(rArchivesFile.getAbsolutePath)
} else {
warn("sparkr.zip not found; cannot start R interpreter.")
None
}
}
}
}
def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = {
if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) {
// datanucleus jars has already been in classpath in integration test
Seq.empty
} else {
val sparkHome = livyConf.sparkHome().get
val libdir = sparkMajorVersion match {
case 2 | 3 =>
if (new File(sparkHome, "RELEASE").isFile) {
new File(sparkHome, "jars")
} else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
new File(sparkHome, "assembly/target/scala-2.11/jars")
} else {
new File(sparkHome, "assembly/target/scala-2.12/jars")
}
case v =>
throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion")
}
val jars = if (!libdir.isDirectory) {
Seq.empty[String]
} else {
libdir.listFiles().filter(_.getName.startsWith("datanucleus-"))
.map(_.getAbsolutePath).toSeq
}
if (jars.isEmpty) {
warn("datanucleus jars can not be found")
}
jars
}
}
/**
* Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf)
* 1. First look for hive-site.xml in user request
* 2. Then look for that under classpath
* @param livyConf
* @return (hive-site.xml path, whether it is provided by user)
*/
def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = {
if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) {
(None, true)
} else {
val hiveSiteURL = getClass.getResource("/hive-site.xml")
if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") {
(Some(new File(hiveSiteURL.toURI)), false)
} else {
(None, false)
}
}
}
def findPySparkArchives(): Seq[String] = {
Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES))
.map(_.split(",").toSeq)
.getOrElse {
sys.env.get("SPARK_HOME") .map { case sparkHome =>
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
var py4jZip: DirectoryStream[java.nio.file.Path] = null;
var py4jFile: File = null;
try {
py4jZip = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
py4jFile = py4jZip.iterator()
.next()
.toFile
}
finally {
if (py4jZip != null) {
py4jZip.close()
}
}
if (!pyArchivesFile.exists()) {
warn("pyspark.zip not found; cannot start pyspark interpreter.")
Seq.empty
} else if (!py4jFile.exists()) {
warn("py4j-*-src.zip not found; can start pyspark interpreter.")
Seq.empty
} else {
Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
}
}.getOrElse(Seq())
}
}
def mergeConfList(list: Seq[String], key: String): Unit = {
if (list.nonEmpty) {
builderProperties.get(key) match {
case None =>
builderProperties.put(key, list.mkString(","))
case Some(oldList) =>
val newList = (oldList :: list.toList).mkString(",")
builderProperties.put(key, newList)
}
}
}
def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
hiveSiteFile(sparkFiles, livyConf) match {
case (_, true) =>
debug("Enable HiveContext because hive-site.xml is found in user request.")
mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
case (Some(file), false) =>
debug("Enable HiveContext because hive-site.xml is found under classpath, "
+ file.getAbsolutePath)
mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES)
mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
case (None, false) =>
warn("Enable HiveContext but no hive-site.xml found under" +
" classpath or user request.")
}
}
val pySparkFiles = if (!LivyConf.TEST_MODE) {
findPySparkArchives()
} else {
Nil
}
if (pySparkFiles.nonEmpty) {
builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
}
mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
sparkRArchive.foreach { archive =>
builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
}
builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)
// Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))
require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)
val (sparkMajorVersion, _) =
LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
// pass spark.livy.spark_major_version to driver
builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)
val confVal = if (enableHiveContext) "hive" else "in-memory"
builderProperties.put("spark.sql.catalogImplementation", confVal)
if (enableHiveContext) {
mergeHiveSiteAndHiveDeps(sparkMajorVersion)
}
// Pick all the RSC-specific configs that have not been explicitly set otherwise, and
// put them in the resulting properties, so that the remote driver can use them.
livyConf.iterator().asScala.foreach { e =>
val (key, value) = (e.getKey(), e.getValue())
if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) {
builderProperties(key) = value
}
}
builderProperties
}