private def startTimeoutChecker()

in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala [358:414]


  private def startTimeoutChecker(): Unit = {
    val interval: Long = Math.max(checkInterval, 3000L)
    // minimum 3 seconds
    val timeoutChecker: Runnable = new Runnable() {
      override def run(): Unit = {
        sleepFor(interval)
        while (!shutdown) {
          val current: Long = System.currentTimeMillis
          val iterator = sessionHandleToLivySession.entrySet().iterator()
          while (iterator.hasNext && ! shutdown) {
            val entry = iterator.next()
            val sessionHandle = entry.getKey
            entry.getValue.value.flatMap(_.toOption).foreach { livySession =>

              if (sessionTimeout > 0 && livySession.lastActivity + sessionTimeout <= current &&
                 (!checkOperation || getNoOperationTime(sessionHandle) > sessionTimeout)) {
                warn(s"Session $sessionHandle is Timed-out (last access : " +
                  new Date(livySession.lastActivity) + ") and will be closed")
                try {
                  closeSession(sessionHandle)
                } catch {
                  case e: HiveSQLException =>
                    warn(s"Exception is thrown closing session $sessionHandle", e)
                }
              } else {
                val operations = operationManager.getTimedOutOperations(sessionHandle)
                if (operations.nonEmpty) {
                  operations.foreach { op =>
                    try {
                      warn(s"Operation ${op.opHandle} is timed-out and will be closed")
                      operationManager.closeOperation(op.opHandle)
                    } catch {
                      case e: Exception =>
                        warn("Exception is thrown closing timed-out operation: " + op.opHandle, e)
                    }
                  }
                }
              }
            }
          }
          sleepFor(interval)
        }
      }

      private def sleepFor(interval: Long): Unit = {
        timeoutCheckerLock.synchronized {
          try {
            timeoutCheckerLock.wait(interval)
          } catch {
            case e: InterruptedException =>
            // Ignore, and break.
          }
        }
      }
    }
    backgroundOperationPool.execute(timeoutChecker)
  }