override def start()

in server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala [459:529]


  override def start(): Unit = {
    sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
    heartbeat()
    app = mockApp.orElse {
      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
        .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
      if (!livyConf.isRunningOnKubernetes()) {
        driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
      } else {
        // Create SparkApp for Kubernetes anyway
        Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
      }
    }

    if (client.isEmpty) {
      transition(Dead())
      val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
      info(msg)
      sessionLog = IndexedSeq(msg)
    } else {
      val uriFuture = Future {
        client.get.getServerUri.get()
      }(sessionManageExecutors)

      uriFuture.onSuccess { case url =>
        rscDriverUri = Option(url)
        sessionSaveLock.synchronized {
          sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
        }
      }(sessionManageExecutors)

      uriFuture.onFailure {
        case e => warn("Fail to get rsc uri", e)
      }(sessionManageExecutors)

      // Send a dummy job that will return once the client is ready to be used, and set the
      // state to "idle" at that point.
      client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
      override def onJobQueued(job: JobHandle[Void]): Unit = { }
      override def onJobStarted(job: JobHandle[Void]): Unit = { }

        override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()

        override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()

        override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
          transition(SessionState.Running)
          info(s"Interactive session $id created [appid: ${appId.orNull}, " +
            s"owner: $owner, proxyUser:" +
            s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
            s"info: ${appInfo.asJavaMap}]")
        }

        private def errorOut(): Unit = {
          // Other code might call stop() to close the RPC channel. When RPC channel is closing,
          // this callback might be triggered. Check and don't call stop() to avoid nested called
          // if the session is already shutting down.
          if (serverSideState != SessionState.ShuttingDown) {
            transition(SessionState.Error())
            stop()
            app.foreach { a =>
              info(s"Failed to ping RSC driver for session $id. Killing application.")
              a.kill()
            }
          }
        }
      })
    }
    startedOn = Some(System.nanoTime())
    info(s"Started $this")
  }