override def run()

in server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala [194:222]


    override def run(): Unit = {
      while (true) {
        try {
          val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
          var numberOfAppsToProcess = appQueue.size() / poolSize
          if (numberOfAppsToProcess < 1) {
            numberOfAppsToProcess = 1
          } else if (numberOfAppsToProcess > 20) {
            numberOfAppsToProcess = 20
          }
          for (_ <- 0 until numberOfAppsToProcess) {
            // update time when monitor app so that
            // checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring
            monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis())
            val app = appQueue.poll()
            if (app != null) {
              app.monitorSparkKubernetesApp()
              if (app.isRunning) {
                appQueue.add(app)
              }
            }
          }
          Thread.sleep(pollInterval.toMillis)
        } catch {
          case e: InterruptedException =>
            error(s"Kubernetes app monitoring was interrupted.", e)
        }
      }
    }