in server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala [55:124]
def create(
id: Int,
name: Option[String],
request: CreateBatchRequest,
livyConf: LivyConf,
accessManager: AccessManager,
owner: String,
proxyUser: Option[String],
sessionStore: SessionStore,
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
appTag,
livyConf,
prepareConf(
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
require(request.file != null, "File is required.")
val builder = new SparkProcessBuilder(livyConf)
builder.conf(conf)
impersonatedUser.foreach(builder.proxyUser)
request.className.foreach(builder.className)
request.driverMemory.foreach(builder.driverMemory)
request.driverCores.foreach(builder.driverCores)
request.executorMemory.foreach(builder.executorMemory)
request.executorCores.foreach(builder.executorCores)
request.numExecutors.foreach(builder.numExecutors)
request.queue.foreach(builder.queue)
request.name.foreach(builder.name)
sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata)
builder.redirectOutput(Redirect.PIPE)
builder.redirectErrorStream(true)
val file = resolveURIs(Seq(request.file), livyConf)(0)
val sparkSubmit = builder.start(Some(file), request.args)
Utils.startDaemonThread(s"batch-session-process-$id") {
childProcesses.incrementAndGet()
try {
sparkSubmit.waitFor() match {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
}
} finally {
childProcesses.decrementAndGet()
}
}
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
}
info(s"Creating batch session $id: [owner: $owner, request: $request]")
new BatchSession(
id,
name,
appTag,
SessionState.Starting,
livyConf,
owner,
impersonatedUser,
sessionStore,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}