def start()

in server/src/main/scala/org/apache/livy/utils/SparkProcessBuilder.scala [157:216]


  def start(file: Option[String], args: Traversable[String]): LineBufferedProcess = {
    var arguments = ArrayBuffer(_executable)

    def addOpt(option: String, value: Option[String]): Unit = {
      value.foreach { v =>
        arguments += option
        arguments += v
      }
    }

    def addList(option: String, values: Traversable[String]): Unit = {
      if (values.nonEmpty) {
        arguments += option
        arguments += values.mkString(",")
      }
    }

    addOpt("--master", _master)
    addOpt("--deploy-mode", _deployMode)
    addOpt("--name", _name)
    addOpt("--class", _className)
    _conf.foreach { case (key, value) =>
      if (key == "spark.submit.pyFiles") {
         arguments += "--py-files"
         arguments += f"$value"
      } else {
         arguments += "--conf"
         arguments += f"$key=$value"
      }
    }
    addList("--driver-class-path", _driverClassPath)

    if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
      addOpt("--proxy-user", _proxyUser)
    }

    addOpt("--queue", _queue)

    arguments += file.getOrElse("spark-internal")
    arguments ++= args

    val argsString = arguments
      .map("'" + _.replace("'", "\\'") + "'")
      .mkString(" ")

    info(s"Running $argsString")

    val pb = new ProcessBuilder(arguments.asJava)
    val env = pb.environment()

    for ((key, value) <- _env) {
      env.put(key, value)
    }

    _redirectOutput.foreach(pb.redirectOutput)
    _redirectError.foreach(pb.redirectError)
    _redirectErrorStream.foreach(pb.redirectErrorStream)

    new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
  }