private def monitorSparkKubernetesApp()

in server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala [286:377]


  private def monitorSparkKubernetesApp(): Unit = {
    try {
      if (killed) {
        changeState(SparkApp.State.KILLED)
      } else if (isProcessErrExit) {
        changeState(SparkApp.State.FAILED)
      }
      // Get KubernetesApplication by appTag.
      val appOption: Option[KubernetesApplication] = try {
        getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
      } catch {
        case e: Exception =>
          failToGetAppId()
          appPromise.failure(e)
          return
      }
      if (appOption.isEmpty) {
        failToGetAppId()
        return
      }
      val app: KubernetesApplication = appOption.get
      appPromise.trySuccess(app)
      val appId = app.getApplicationId

      Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
      listener.foreach(_.appIdKnown(appId))

      if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
        withRetry(kubernetesClient.createSparkUIIngress(app, livyConf))
      }

      var appInfo = AppInfo()

      // while loop is replaced with "if" condition so that another thread can process and continue
      if (isRunning) {
        try {
          Clock.sleep(pollInterval.toMillis)

          // Refresh application state
          val appReport = withRetry {
            debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " +
              s"namespace: ${app.getApplicationNamespace} " +
              s"applicationTag: ${app.getApplicationTag}")
            val report = kubernetesClient.getApplicationReport(livyConf, app,
              cacheLogSize = cacheLogSize)
            report
          }

          kubernetesAppLog = appReport.getApplicationLog
          kubernetesDiagnostics = appReport.getApplicationDiagnostics
          changeState(mapKubernetesState(appReport.getApplicationState, appTag))

          val latestAppInfo = AppInfo(
            appReport.getDriverLogUrl,
            appReport.getTrackingUrl,
            appReport.getExecutorsLogUrls
          )
          if (appInfo != latestAppInfo) {
            listener.foreach(_.infoChanged(latestAppInfo))
            appInfo = latestAppInfo
          }
        } catch {
          // TODO analyse available exceptions
          case e: Throwable =>
            error(s"Failed to refresh application state for $appTag.", e)
        }
      }

      kubernetesTagToAppIdFailedTimes = 0
      kubernetesAppMonitorFailedTimes = 0
      debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
      Thread.currentThread().setName(s"appMonitorCommonThreadPool")
    } catch {
      case e: InterruptedException =>
        kubernetesAppMonitorFailedTimes += 1
        if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
          error(s"Monitoring of the app $appTag was interrupted.", e)
          kubernetesDiagnostics = ArrayBuffer(e.getMessage)
          failToMonitor()
        }
      case NonFatal(e) =>
        error(s"Error while refreshing Kubernetes state", e)
        kubernetesDiagnostics = ArrayBuffer(e.getMessage)
        changeState(SparkApp.State.FAILED)
    } finally {
      if (!isRunning) {
        listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl(
          livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown")
        )))))
      }
    }
  }