in server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala [454:517]
override def start(): Unit = {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
heartbeat()
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
None
}
}
if (client.isEmpty) {
transition(Dead())
val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
info(msg)
sessionLog = IndexedSeq(msg)
} else {
val uriFuture = Future { client.get.getServerUri.get() }
uriFuture.onSuccess { case url =>
rscDriverUri = Option(url)
sessionSaveLock.synchronized {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
}
}
uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) }
// Send a dummy job that will return once the client is ready to be used, and set the
// state to "idle" at that point.
client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
override def onJobQueued(job: JobHandle[Void]): Unit = { }
override def onJobStarted(job: JobHandle[Void]): Unit = { }
override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()
override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
transition(SessionState.Running)
info(s"Interactive session $id created [appid: ${appId.orNull}, " +
s"owner: $owner, proxyUser:" +
s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
s"info: ${appInfo.asJavaMap}]")
}
private def errorOut(): Unit = {
// Other code might call stop() to close the RPC channel. When RPC channel is closing,
// this callback might be triggered. Check and don't call stop() to avoid nested called
// if the session is already shutting down.
if (serverSideState != SessionState.ShuttingDown) {
transition(SessionState.Error())
stop()
app.foreach { a =>
info(s"Failed to ping RSC driver for session $id. Killing application.")
a.kill()
}
}
}
})
}
}