override def process()

in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala [218:282]


  override def process(inProt: TProtocol, outProt: TProtocol): Boolean = {
    val trans = inProt.getTransport
    if (!trans.isInstanceOf[TSaslServerTransport]) {
      throw new TException(s"Unexpected non-SASL transport ${trans.getClass}")
    }
    val saslTrans: TSaslServerTransport = trans.asInstanceOf[TSaslServerTransport]
    val saslServer: SaslServer = saslTrans.getSaslServer
    val authId: String = saslServer.getAuthorizationID
    debug(s"AUTH ID ======> $authId")
    var endUser = authId
    val socket = saslTrans.getUnderlyingTransport.asInstanceOf[TSocket].getSocket
    AuthBridgeServer.remoteAddress.set(socket.getInetAddress)
    val mechanismName: String = saslServer.getMechanismName
    AuthBridgeServer.userAuthMechanism.set(mechanismName)
    if (AuthMethod.PLAIN.getMechanismName.equalsIgnoreCase(mechanismName)) {
      AuthBridgeServer.remoteUser.set(endUser)
      return wrapped.process(inProt, outProt)
    }
    AuthBridgeServer.authenticationMethod.set(UserGroupInformation.AuthenticationMethod.KERBEROS)
    if (AuthMethod.TOKEN.getMechanismName.equalsIgnoreCase(mechanismName)) {
      try {
        val tokenId = SaslRpcServer.getIdentifier(authId, secretManager)
        endUser = tokenId.getUser.getUserName
        AuthBridgeServer.authenticationMethod.set(UserGroupInformation.AuthenticationMethod.TOKEN)
      } catch {
        case e: InvalidToken => throw new TException(e.getMessage)
      }
    }
    var clientUgi: UserGroupInformation = null
    try {
      if (useProxy) {
        clientUgi = UserGroupInformation.createProxyUser(
          endUser, UserGroupInformation.getLoginUser)
        AuthBridgeServer.remoteUser.set(clientUgi.getShortUserName)
        debug(s"Set remoteUser : ${AuthBridgeServer.remoteUser.get}")
        clientUgi.doAs(new PrivilegedExceptionAction[Boolean]() {
          override def run: Boolean = try {
            wrapped.process(inProt, outProt)
          } catch {
            case te: TException => throw new RuntimeException(te)
          }
        })
      } else {
        // use the short user name for the request
        val endUserUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(endUser)
        AuthBridgeServer.remoteUser.set(endUserUgi.getShortUserName)
        debug(s"Set remoteUser: ${AuthBridgeServer.remoteUser.get}, from endUser :" + endUser)
        wrapped.process(inProt, outProt)
      }
    } catch {
      case rte: RuntimeException if rte.getCause.isInstanceOf[TException] => throw rte.getCause
      case rte: RuntimeException => throw rte
      case ie: InterruptedException => throw new RuntimeException(ie) // unexpected!
      case ioe: IOException => throw new RuntimeException(ioe)
    } finally {
      if (clientUgi != null) {
        try {
          FileSystem.closeAllForUGI(clientUgi)
        } catch {
          case exception: IOException =>
            error(s"Could not clean up file-system handles for UGI: $clientUgi", exception)
        }
      }
    }
  }