private def getYarnTagToAppIdTimeout()

in server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala [59:112]


  private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration =
    livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds

  private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
    livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds

  private[utils] val appType = Set("SPARK").asJava

  private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()

  private var sessionLeakageCheckTimeout: Long = _

  private var sessionLeakageCheckInterval: Long = _

  private val leakedAppsGCThread = new Thread() {
    override def run(): Unit = {
      val client = {
        mockYarnClient match {
          case Some(client) => client
          case None => yarnClient
        }
      }

      while (true) {
        if (!leakedAppTags.isEmpty) {
          // kill the app if found it and remove it if exceeding a threshold
          val iter = leakedAppTags.entrySet().iterator()
          val now = System.currentTimeMillis()
          val apps = client.getApplications(appType).asScala

          while(iter.hasNext) {
            var isRemoved = false
            val entry = iter.next()

            apps.find(_.getApplicationTags.contains(entry.getKey))
              .foreach({ e =>
                info(s"Kill leaked app ${e.getApplicationId}")
                client.killApplication(e.getApplicationId)
                iter.remove()
                isRemoved = true
              })

            if (!isRemoved) {
              if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
                iter.remove()
                info(s"Remove leaked yarn app tag ${entry.getKey}")
              }
            }
          }
        }
        Thread.sleep(sessionLeakageCheckInterval)
      }
    }
  }