in kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala [132:233]
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
handleSessionException {
withDiscoveryClient(sessionConf) { discoveryClient =>
var openEngineSessionConf =
optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
if (engineCredentials.nonEmpty) {
sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
openEngineSessionConf =
openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
}
if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
openEngineSessionConf = openEngineSessionConf +
(SESSION_USER_SIGN_ENABLED.key ->
sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
(KYUUBI_SESSION_SIGN_PUBLICKEY ->
Base64.getEncoder.encodeToString(
sessionManager.signingPublicKey.getEncoded)) +
(KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
}
val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
val openOnFailure =
EngineOpenOnFailure.withName(sessionManager.getConf.get(ENGINE_OPEN_ON_FAILURE))
var attempt = 0
var shouldRetry = true
while (attempt <= maxAttempts && shouldRetry) {
val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
def deregisterEngine(): Unit =
try {
engine.deregister(discoveryClient, (host, port))
} catch {
case e: Throwable =>
warn(s"Error on de-registering engine [${engine.engineSpace} $host:$port]", e)
}
var engineClient: KyuubiSyncThriftClient = null
try {
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
InternalSecurityAccessor.get().issueToken()
} else {
Option(password).filter(_.nonEmpty).getOrElse("anonymous")
}
engineClient =
KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
_engineSessionHandle =
engineClient.openSession(protocol, user, passwd, openEngineSessionConf)
_client = engineClient
logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
s" with ${_engineSessionHandle}]")
shouldRetry = false
} catch {
case e: TTransportException
if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
e.getCause.getMessage.contains("Connection refused") =>
warn(
s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
s" $attempt/$maxAttempts times, retrying",
e.getCause)
Thread.sleep(retryWait)
openOnFailure match {
case DEREGISTER_IMMEDIATELY => deregisterEngine()
case _ =>
}
shouldRetry = true
case e: Throwable =>
error(
s"Opening engine [${engine.defaultEngineName} $host:$port]" +
s" for $user session failed",
e)
openSessionError = Some(e)
openOnFailure match {
case DEREGISTER_IMMEDIATELY | DEREGISTER_AFTER_RETRY => deregisterEngine()
case _ =>
}
throw e
} finally {
attempt += 1
if (shouldRetry && engineClient != null) {
try {
engineClient.closeSession()
} catch {
case e: Throwable =>
warn(
"Error on closing broken client of engine " +
s"[${engine.defaultEngineName} $host:$port]",
e)
}
}
}
}
sessionEvent.openedTime = System.currentTimeMillis()
sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
_client.engineId.foreach(e => sessionEvent.engineId = e)
_client.engineName.foreach(e => sessionEvent.engineName = e)
_client.engineUrl.foreach(e => sessionEvent.engineUrl = e)
EventBus.post(sessionEvent)
}
}