in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala [358:414]
private def startTimeoutChecker(): Unit = {
val interval: Long = Math.max(checkInterval, 3000L)
// minimum 3 seconds
val timeoutChecker: Runnable = new Runnable() {
override def run(): Unit = {
sleepFor(interval)
while (!shutdown) {
val current: Long = System.currentTimeMillis
val iterator = sessionHandleToLivySession.entrySet().iterator()
while (iterator.hasNext && ! shutdown) {
val entry = iterator.next()
val sessionHandle = entry.getKey
entry.getValue.value.flatMap(_.toOption).foreach { livySession =>
if (sessionTimeout > 0 && livySession.lastActivity + sessionTimeout <= current &&
(!checkOperation || getNoOperationTime(sessionHandle) > sessionTimeout)) {
warn(s"Session $sessionHandle is Timed-out (last access : " +
new Date(livySession.lastActivity) + ") and will be closed")
try {
closeSession(sessionHandle)
} catch {
case e: HiveSQLException =>
warn(s"Exception is thrown closing session $sessionHandle", e)
}
} else {
val operations = operationManager.getTimedOutOperations(sessionHandle)
if (operations.nonEmpty) {
operations.foreach { op =>
try {
warn(s"Operation ${op.opHandle} is timed-out and will be closed")
operationManager.closeOperation(op.opHandle)
} catch {
case e: Exception =>
warn("Exception is thrown closing timed-out operation: " + op.opHandle, e)
}
}
}
}
}
}
sleepFor(interval)
}
}
private def sleepFor(interval: Long): Unit = {
timeoutCheckerLock.synchronized {
try {
timeoutCheckerLock.wait(interval)
} catch {
case e: InterruptedException =>
// Ignore, and break.
}
}
}
}
backgroundOperationPool.execute(timeoutChecker)
}