def create()

in server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala [55:124]


  def create(
      id: Int,
      name: Option[String],
      request: CreateBatchRequest,
      livyConf: LivyConf,
      accessManager: AccessManager,
      owner: String,
      proxyUser: Option[String],
      sessionStore: SessionStore,
      mockApp: Option[SparkApp] = None): BatchSession = {
    val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
    val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)

    def createSparkApp(s: BatchSession): SparkApp = {
      val conf = SparkApp.prepareSparkConf(
        appTag,
        livyConf,
        prepareConf(
          request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
      require(request.file != null, "File is required.")

      val builder = new SparkProcessBuilder(livyConf)
      builder.conf(conf)

      impersonatedUser.foreach(builder.proxyUser)
      request.className.foreach(builder.className)
      request.driverMemory.foreach(builder.driverMemory)
      request.driverCores.foreach(builder.driverCores)
      request.executorMemory.foreach(builder.executorMemory)
      request.executorCores.foreach(builder.executorCores)
      request.numExecutors.foreach(builder.numExecutors)
      request.queue.foreach(builder.queue)
      request.name.foreach(builder.name)

      sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata)

      builder.redirectOutput(Redirect.PIPE)
      builder.redirectErrorStream(true)

      val file = resolveURIs(Seq(request.file), livyConf)(0)
      val sparkSubmit = builder.start(Some(file), request.args)

      Utils.startDaemonThread(s"batch-session-process-$id") {
        childProcesses.incrementAndGet()
        try {
          sparkSubmit.waitFor() match {
            case 0 =>
            case exitCode =>
              warn(s"spark-submit exited with code $exitCode")
          }
        } finally {
          childProcesses.decrementAndGet()
        }
      }
      SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
    }

    info(s"Creating batch session $id: [owner: $owner, request: $request]")

    new BatchSession(
      id,
      name,
      appTag,
      SessionState.Starting,
      livyConf,
      owner,
      impersonatedUser,
      sessionStore,
      mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
  }