private[kyuubi] def openEngineSession()

in kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala [132:233]


  private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
    handleSessionException {
      withDiscoveryClient(sessionConf) { discoveryClient =>
        var openEngineSessionConf =
          optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
        if (engineCredentials.nonEmpty) {
          sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
          openEngineSessionConf =
            openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
        }

        if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
          openEngineSessionConf = openEngineSessionConf +
            (SESSION_USER_SIGN_ENABLED.key ->
              sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
            (KYUUBI_SESSION_SIGN_PUBLICKEY ->
              Base64.getEncoder.encodeToString(
                sessionManager.signingPublicKey.getEncoded)) +
            (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
        }

        val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
        val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
        val openOnFailure =
          EngineOpenOnFailure.withName(sessionManager.getConf.get(ENGINE_OPEN_ON_FAILURE))
        var attempt = 0
        var shouldRetry = true
        while (attempt <= maxAttempts && shouldRetry) {
          val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)

          def deregisterEngine(): Unit =
            try {
              engine.deregister(discoveryClient, (host, port))
            } catch {
              case e: Throwable =>
                warn(s"Error on de-registering engine [${engine.engineSpace} $host:$port]", e)
            }

          var engineClient: KyuubiSyncThriftClient = null
          try {
            val passwd =
              if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
                InternalSecurityAccessor.get().issueToken()
              } else {
                Option(password).filter(_.nonEmpty).getOrElse("anonymous")
              }
            engineClient =
              KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
            _engineSessionHandle =
              engineClient.openSession(protocol, user, passwd, openEngineSessionConf)
            _client = engineClient
            logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
              s" with ${_engineSessionHandle}]")
            shouldRetry = false
          } catch {
            case e: TTransportException
                if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
                  e.getCause.getMessage.contains("Connection refused") =>
              warn(
                s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
                  s" $attempt/$maxAttempts times, retrying",
                e.getCause)
              Thread.sleep(retryWait)
              openOnFailure match {
                case DEREGISTER_IMMEDIATELY => deregisterEngine()
                case _ =>
              }
              shouldRetry = true
            case e: Throwable =>
              error(
                s"Opening engine [${engine.defaultEngineName} $host:$port]" +
                  s" for $user session failed",
                e)
              openSessionError = Some(e)
              openOnFailure match {
                case DEREGISTER_IMMEDIATELY | DEREGISTER_AFTER_RETRY => deregisterEngine()
                case _ =>
              }
              throw e
          } finally {
            attempt += 1
            if (shouldRetry && engineClient != null) {
              try {
                engineClient.closeSession()
              } catch {
                case e: Throwable =>
                  warn(
                    "Error on closing broken client of engine " +
                      s"[${engine.defaultEngineName} $host:$port]",
                    e)
              }
            }
          }
        }
        sessionEvent.openedTime = System.currentTimeMillis()
        sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
        _client.engineId.foreach(e => sessionEvent.engineId = e)
        _client.engineName.foreach(e => sessionEvent.engineName = e)
        _client.engineUrl.foreach(e => sessionEvent.engineUrl = e)
        EventBus.post(sessionEvent)
      }
    }