in server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala [286:377]
private def monitorSparkKubernetesApp(): Unit = {
try {
if (killed) {
changeState(SparkApp.State.KILLED)
} else if (isProcessErrExit) {
changeState(SparkApp.State.FAILED)
}
// Get KubernetesApplication by appTag.
val appOption: Option[KubernetesApplication] = try {
getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
} catch {
case e: Exception =>
failToGetAppId()
appPromise.failure(e)
return
}
if (appOption.isEmpty) {
failToGetAppId()
return
}
val app: KubernetesApplication = appOption.get
appPromise.trySuccess(app)
val appId = app.getApplicationId
Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
listener.foreach(_.appIdKnown(appId))
if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
withRetry(kubernetesClient.createSparkUIIngress(app, livyConf))
}
var appInfo = AppInfo()
// while loop is replaced with "if" condition so that another thread can process and continue
if (isRunning) {
try {
Clock.sleep(pollInterval.toMillis)
// Refresh application state
val appReport = withRetry {
debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " +
s"namespace: ${app.getApplicationNamespace} " +
s"applicationTag: ${app.getApplicationTag}")
val report = kubernetesClient.getApplicationReport(livyConf, app,
cacheLogSize = cacheLogSize)
report
}
kubernetesAppLog = appReport.getApplicationLog
kubernetesDiagnostics = appReport.getApplicationDiagnostics
changeState(mapKubernetesState(appReport.getApplicationState, appTag))
val latestAppInfo = AppInfo(
appReport.getDriverLogUrl,
appReport.getTrackingUrl,
appReport.getExecutorsLogUrls
)
if (appInfo != latestAppInfo) {
listener.foreach(_.infoChanged(latestAppInfo))
appInfo = latestAppInfo
}
} catch {
// TODO analyse available exceptions
case e: Throwable =>
error(s"Failed to refresh application state for $appTag.", e)
}
}
kubernetesTagToAppIdFailedTimes = 0
kubernetesAppMonitorFailedTimes = 0
debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
Thread.currentThread().setName(s"appMonitorCommonThreadPool")
} catch {
case e: InterruptedException =>
kubernetesAppMonitorFailedTimes += 1
if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
error(s"Monitoring of the app $appTag was interrupted.", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
failToMonitor()
}
case NonFatal(e) =>
error(s"Error while refreshing Kubernetes state", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
changeState(SparkApp.State.FAILED)
} finally {
if (!isRunning) {
listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl(
livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown")
)))))
}
}
}