in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala [483:523]
private[thriftserver] def incrementConnections(
username: String,
ipAddress: String,
forwardedAddresses: util.List[String]): Unit = {
val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
val userAndAddress = username + ":" + clientIpAddress
val trackUser = trackConnectionsPerUser(username)
val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
// Optimistically increment the counts while getting them to check for violations.
// If any limit has been exceeded, we won't be going ahead with the connection,
// so decrement all counts that have been incremented.
if (trackUser) {
val userCount = incrementConnectionsCount(username)
if (userCount > userLimit) {
decrementConnectionsCount(username)
logAndThrowException("Connection limit per user reached " +
s"(user: $username limit: $userLimit)")
}
}
if (trackIpAddress) {
val ipAddressCount = incrementConnectionsCount(clientIpAddress)
if (ipAddressCount > ipAddressLimit) {
if (trackUser) decrementConnectionsCount(username)
decrementConnectionsCount(clientIpAddress)
logAndThrowException("Connection limit per ipaddress reached " +
s"(ipaddress: $clientIpAddress limit: $ipAddressLimit)")
}
}
if (trackUserIpAddress) {
val userIpAddressCount = incrementConnectionsCount(userAndAddress)
if (userIpAddressCount > userIpAddressLimit) {
if (trackUser) decrementConnectionsCount(username)
if (trackIpAddress) decrementConnectionsCount(clientIpAddress)
decrementConnectionsCount(userAndAddress)
logAndThrowException("Connection limit per user:ipaddress reached " +
s"(user:ipaddress: $userAndAddress limit: $userIpAddressLimit)")
}
}
}