in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala [235:292]
def openSession(
protocol: TProtocolVersion,
username: String,
password: String,
ipAddress: String,
sessionConf: JMap[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
val sessionHandle = new SessionHandle(protocol)
incrementConnections(username, ipAddress, SessionInfo.getForwardedAddresses)
sessionInfo.put(sessionHandle,
new SessionInfo(username, ipAddress, SessionInfo.getForwardedAddresses, protocol))
val (initStatements, createInteractiveRequest, sessionId) =
LivyThriftSessionManager.processSessionConf(sessionConf, supportUseDatabase)
val createLivySession = () => {
createInteractiveRequest.kind = Spark
val newSession = InteractiveSession.create(
server.livySessionManager.nextId(),
createInteractiveRequest.name,
username,
None,
server.livyConf,
server.accessManager,
createInteractiveRequest,
server.sessionStore,
None,
None)
onLivySessionOpened(newSession)
newSession
}
val futureLivySession = Future {
val livyServiceUGI = UserGroupInformation.getCurrentUser
var livySession: InteractiveSession = null
try {
livyServiceUGI.doAs(new PrivilegedExceptionAction[InteractiveSession] {
override def run(): InteractiveSession = {
livySession =
getOrCreateLivySession(sessionHandle, sessionId, createInteractiveRequest.name,
username, createLivySession)
synchronized {
managedLivySessionActiveUsers.get(livySession.id).foreach { numUsers =>
managedLivySessionActiveUsers(livySession.id) = numUsers + 1
}
}
initSession(sessionHandle, livySession, initStatements)
livySession
}
})
} catch {
case e: UndeclaredThrowableException =>
throw new ThriftSessionCreationException(Option(livySession), e.getCause)
case e: Throwable =>
throw new ThriftSessionCreationException(Option(livySession), e)
}
}
sessionHandleToLivySession.put(sessionHandle, futureLivySession)
sessionHandle
}