def apply()

in repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala [74:153]


  def apply(conf: SparkConf, entries: SparkEntries): SparkRInterpreter = {
    sparkEntries = entries
    val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
    val mirror = universe.runtimeMirror(getClass.getClassLoader)
    val sparkRBackendClass = mirror.classLoader.loadClass("org.apache.spark.api.r.RBackend")
    val backendInstance = sparkRBackendClass.getDeclaredConstructor().newInstance()

    var sparkRBackendPort = 0
    var sparkRBackendSecret: String = null
    val initialized = new Semaphore(0)
    // Launch a SparkR backend server for the R process to connect to
    val backendThread = new Thread("SparkR backend") {
      override def run(): Unit = {
        try {
          sparkRBackendPort = sparkRBackendClass.getMethod("init").invoke(backendInstance)
            .asInstanceOf[Int]
        } catch {
          case NonFatal(e) =>
            warn("Fail to init Spark RBackend, using different method signature", e)
            val retTuple = sparkRBackendClass.getMethod("init").invoke(backendInstance)
              .asInstanceOf[(Int, Object)]
            sparkRBackendPort = retTuple._1
            sparkRBackendSecret = Try {
              val rAuthHelper = retTuple._2
              rAuthHelper.getClass.getMethod("secret").invoke(rAuthHelper).asInstanceOf[String]
            }.getOrElse(null)
        }

        initialized.release()
        sparkRBackendClass.getMethod("run").invoke(backendInstance)
      }
    }

    backendThread.setDaemon(true)
    backendThread.start()
    try {
      // Wait for RBackend initialization to finish
      initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)
      val rExec = conf.getOption("spark.r.shell.command")
        .orElse(sys.env.get("SPARKR_DRIVER_R"))
        .getOrElse("R")

      var packageDir = ""
      if (sys.env.getOrElse("SPARK_YARN_MODE", "") == "true" ||
        (conf.get("spark.master", "").toLowerCase == "yarn" &&
          conf.get("spark.submit.deployMode", "").toLowerCase == "cluster")) {
        packageDir = "./sparkr"
      } else {
        // local mode
        val rLibPath = new File(sys.env.getOrElse("SPARKR_PACKAGE_DIR",
          Seq(sys.env.getOrElse("SPARK_HOME", "."), "R", "lib").mkString(File.separator)))
        if (!ClientConf.TEST_MODE) {
          require(rLibPath.exists(), "Cannot find sparkr package directory.")
          packageDir = rLibPath.getAbsolutePath()
        }
      }

      val builder = new ProcessBuilder(Seq(rExec, "--slave @").asJava)
      val env = builder.environment()
      env.put("SPARK_HOME", sys.env.getOrElse("SPARK_HOME", "."))
      env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
      if (sparkRBackendSecret != null) {
        env.put("SPARKR_BACKEND_AUTH_SECRET", sparkRBackendSecret)
      }
      env.put("SPARKR_PACKAGE_DIR", packageDir)
      env.put("R_PROFILE_USER",
        Seq(packageDir, "SparkR", "profile", "general.R").mkString(File.separator))

      builder.redirectErrorStream(true)
      val process = builder.start()
      new SparkRInterpreter(process, backendInstance, backendThread,
        conf.getInt("spark.livy.spark_major_version", 1), sparkRBackendSecret != null)
    } catch {
      case e: Exception =>
        if (backendThread != null) {
          backendThread.interrupt()
        }
        throw e
    }
  }