in repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala [74:153]
def apply(conf: SparkConf, entries: SparkEntries): SparkRInterpreter = {
sparkEntries = entries
val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
val mirror = universe.runtimeMirror(getClass.getClassLoader)
val sparkRBackendClass = mirror.classLoader.loadClass("org.apache.spark.api.r.RBackend")
val backendInstance = sparkRBackendClass.getDeclaredConstructor().newInstance()
var sparkRBackendPort = 0
var sparkRBackendSecret: String = null
val initialized = new Semaphore(0)
// Launch a SparkR backend server for the R process to connect to
val backendThread = new Thread("SparkR backend") {
override def run(): Unit = {
try {
sparkRBackendPort = sparkRBackendClass.getMethod("init").invoke(backendInstance)
.asInstanceOf[Int]
} catch {
case NonFatal(e) =>
warn("Fail to init Spark RBackend, using different method signature", e)
val retTuple = sparkRBackendClass.getMethod("init").invoke(backendInstance)
.asInstanceOf[(Int, Object)]
sparkRBackendPort = retTuple._1
sparkRBackendSecret = Try {
val rAuthHelper = retTuple._2
rAuthHelper.getClass.getMethod("secret").invoke(rAuthHelper).asInstanceOf[String]
}.getOrElse(null)
}
initialized.release()
sparkRBackendClass.getMethod("run").invoke(backendInstance)
}
}
backendThread.setDaemon(true)
backendThread.start()
try {
// Wait for RBackend initialization to finish
initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)
val rExec = conf.getOption("spark.r.shell.command")
.orElse(sys.env.get("SPARKR_DRIVER_R"))
.getOrElse("R")
var packageDir = ""
if (sys.env.getOrElse("SPARK_YARN_MODE", "") == "true" ||
(conf.get("spark.master", "").toLowerCase == "yarn" &&
conf.get("spark.submit.deployMode", "").toLowerCase == "cluster")) {
packageDir = "./sparkr"
} else {
// local mode
val rLibPath = new File(sys.env.getOrElse("SPARKR_PACKAGE_DIR",
Seq(sys.env.getOrElse("SPARK_HOME", "."), "R", "lib").mkString(File.separator)))
if (!ClientConf.TEST_MODE) {
require(rLibPath.exists(), "Cannot find sparkr package directory.")
packageDir = rLibPath.getAbsolutePath()
}
}
val builder = new ProcessBuilder(Seq(rExec, "--slave @").asJava)
val env = builder.environment()
env.put("SPARK_HOME", sys.env.getOrElse("SPARK_HOME", "."))
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
if (sparkRBackendSecret != null) {
env.put("SPARKR_BACKEND_AUTH_SECRET", sparkRBackendSecret)
}
env.put("SPARKR_PACKAGE_DIR", packageDir)
env.put("R_PROFILE_USER",
Seq(packageDir, "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true)
val process = builder.start()
new SparkRInterpreter(process, backendInstance, backendThread,
conf.getInt("spark.livy.spark_major_version", 1), sparkRBackendSecret != null)
} catch {
case e: Exception =>
if (backendThread != null) {
backendThread.interrupt()
}
throw e
}
}