in server/src/main/scala/org/apache/livy/utils/SparkProcessBuilder.scala [157:216]
def start(file: Option[String], args: Traversable[String]): LineBufferedProcess = {
var arguments = ArrayBuffer(_executable)
def addOpt(option: String, value: Option[String]): Unit = {
value.foreach { v =>
arguments += option
arguments += v
}
}
def addList(option: String, values: Traversable[String]): Unit = {
if (values.nonEmpty) {
arguments += option
arguments += values.mkString(",")
}
}
addOpt("--master", _master)
addOpt("--deploy-mode", _deployMode)
addOpt("--name", _name)
addOpt("--class", _className)
_conf.foreach { case (key, value) =>
if (key == "spark.submit.pyFiles") {
arguments += "--py-files"
arguments += f"$value"
} else {
arguments += "--conf"
arguments += f"$key=$value"
}
}
addList("--driver-class-path", _driverClassPath)
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
addOpt("--proxy-user", _proxyUser)
}
addOpt("--queue", _queue)
arguments += file.getOrElse("spark-internal")
arguments ++= args
val argsString = arguments
.map("'" + _.replace("'", "\\'") + "'")
.mkString(" ")
info(s"Running $argsString")
val pb = new ProcessBuilder(arguments.asJava)
val env = pb.environment()
for ((key, value) <- _env) {
env.put(key, value)
}
_redirectOutput.foreach(pb.redirectOutput)
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)
new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
}