in server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala [194:222]
override def run(): Unit = {
while (true) {
try {
val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
var numberOfAppsToProcess = appQueue.size() / poolSize
if (numberOfAppsToProcess < 1) {
numberOfAppsToProcess = 1
} else if (numberOfAppsToProcess > 20) {
numberOfAppsToProcess = 20
}
for (_ <- 0 until numberOfAppsToProcess) {
// update time when monitor app so that
// checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring
monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis())
val app = appQueue.poll()
if (app != null) {
app.monitorSparkKubernetesApp()
if (app.isRunning) {
appQueue.add(app)
}
}
}
Thread.sleep(pollInterval.toMillis)
} catch {
case e: InterruptedException =>
error(s"Kubernetes app monitoring was interrupted.", e)
}
}
}