in server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala [59:112]
private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration =
livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds
private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
private[utils] val appType = Set("SPARK").asJava
private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
private var sessionLeakageCheckTimeout: Long = _
private var sessionLeakageCheckInterval: Long = _
private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
val client = {
mockYarnClient match {
case Some(client) => client
case None => yarnClient
}
}
while (true) {
if (!leakedAppTags.isEmpty) {
// kill the app if found it and remove it if exceeding a threshold
val iter = leakedAppTags.entrySet().iterator()
val now = System.currentTimeMillis()
val apps = client.getApplications(appType).asScala
while(iter.hasNext) {
var isRemoved = false
val entry = iter.next()
apps.find(_.getApplicationTags.contains(entry.getKey))
.foreach({ e =>
info(s"Kill leaked app ${e.getApplicationId}")
client.killApplication(e.getApplicationId)
iter.remove()
isRemoved = true
})
if (!isRemoved) {
if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
iter.remove()
info(s"Remove leaked yarn app tag ${entry.getKey}")
}
}
}
}
Thread.sleep(sessionLeakageCheckInterval)
}
}
}