in server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala [459:529]
override def start(): Unit = {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
heartbeat()
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
}
}
if (client.isEmpty) {
transition(Dead())
val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
info(msg)
sessionLog = IndexedSeq(msg)
} else {
val uriFuture = Future {
client.get.getServerUri.get()
}(sessionManageExecutors)
uriFuture.onSuccess { case url =>
rscDriverUri = Option(url)
sessionSaveLock.synchronized {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
}
}(sessionManageExecutors)
uriFuture.onFailure {
case e => warn("Fail to get rsc uri", e)
}(sessionManageExecutors)
// Send a dummy job that will return once the client is ready to be used, and set the
// state to "idle" at that point.
client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
override def onJobQueued(job: JobHandle[Void]): Unit = { }
override def onJobStarted(job: JobHandle[Void]): Unit = { }
override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()
override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
transition(SessionState.Running)
info(s"Interactive session $id created [appid: ${appId.orNull}, " +
s"owner: $owner, proxyUser:" +
s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
s"info: ${appInfo.asJavaMap}]")
}
private def errorOut(): Unit = {
// Other code might call stop() to close the RPC channel. When RPC channel is closing,
// this callback might be triggered. Check and don't call stop() to avoid nested called
// if the session is already shutting down.
if (serverSideState != SessionState.ShuttingDown) {
transition(SessionState.Error())
stop()
app.foreach { a =>
info(s"Failed to ping RSC driver for session $id. Killing application.")
a.kill()
}
}
}
})
}
startedOn = Some(System.nanoTime())
info(s"Started $this")
}