protected def initServer()

in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftBinaryCLIService.scala [47:157]


  protected def initServer(): Unit = {
    try {
      // Server thread pool
      val executorService = new ThreadPoolExecutorWithOomHook(
        minWorkerThreads,
        maxWorkerThreads,
        workerKeepAliveTime,
        TimeUnit.SECONDS,
        new SynchronousQueue[Runnable],
        new ThreadFactoryWithGarbageCleanup("LivyThriftserver-Handler-Pool"),
        oomHook)
      // Thrift configs
      val transportFactory: TTransportFactory = hiveAuthFactory.getAuthTransFactory
      val processorFactory: TProcessorFactory = hiveAuthFactory.getAuthProcFactory(this)
      var serverSocket: TServerSocket = null
      val serverAddress = if (hiveHost == null || hiveHost.isEmpty) {
          new InetSocketAddress(portNum) // Wildcard bind
        } else {
          new InetSocketAddress(hiveHost, portNum)
        }
      if (!livyConf.getBoolean(LivyConf.THRIFT_USE_SSL)) {
        serverSocket = new TServerSocket(serverAddress)
      } else {
        val sslVersionBlacklist = new util.ArrayList[String]
        livyConf.get(LivyConf.THRIFT_SSL_PROTOCOL_BLACKLIST).split(",").foreach { sslVersion =>
          sslVersionBlacklist.add(sslVersion.trim.toLowerCase)
        }
        val keyStorePath = livyConf.get(LivyConf.SSL_KEYSTORE).trim
        if (keyStorePath.isEmpty) {
          throw new IllegalArgumentException(
            s"${LivyConf.SSL_KEYSTORE.key} Not configured for SSL connection")
        }
        val keyStorePassword = getKeyStorePassword()
        val keystoreType = livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)
        val params = new TSSLTransportFactory.TSSLTransportParameters
        params.setKeyStore(keyStorePath, keyStorePassword, null, keystoreType)
        serverSocket =
          TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress.getAddress, params)
        if (serverSocket.getServerSocket.isInstanceOf[SSLServerSocket]) {
          val sslServerSocket = serverSocket.getServerSocket.asInstanceOf[SSLServerSocket]
          val enabledProtocols = sslServerSocket.getEnabledProtocols.filter { protocol =>
              if (sslVersionBlacklist.contains(protocol.toLowerCase)) {
                debug(s"Disabling SSL Protocol: $protocol")
                false
              } else {
                true
              }
            }
          sslServerSocket.setEnabledProtocols(enabledProtocols)
          info(s"SSL Server Socket Enabled Protocols: ${sslServerSocket.getEnabledProtocols}")
        }
      }
      // Server args
      val maxMessageSize = livyConf.getInt(LivyConf.THRIFT_MAX_MESSAGE_SIZE)
      val requestTimeout =
        livyConf.getTimeAsMs(LivyConf.THRIFT_LOGIN_TIMEOUT).asInstanceOf[Int]
      val beBackoffSlotLength =
        livyConf.getTimeAsMs(LivyConf.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).asInstanceOf[Int]
      val sargs = new TThreadPoolServer.Args(serverSocket)
        .processorFactory(processorFactory)
        .transportFactory(transportFactory)
        .protocolFactory(new TBinaryProtocol.Factory)
        .inputProtocolFactory(
          new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
        .requestTimeout(requestTimeout)
        .requestTimeoutUnit(TimeUnit.MILLISECONDS)
        .beBackoffSlotLength(beBackoffSlotLength)
        .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
        .executorService(executorService)
      // TCP Server
      server = new TThreadPoolServer(sargs)
      server.setServerEventHandler(new TServerEventHandler() {
        override def createContext(input: TProtocol, output: TProtocol): ServerContext = {
          new ThriftCLIServerContext
        }

        override def deleteContext(
           serverContext: ServerContext,
           input: TProtocol,
           output: TProtocol): Unit = {
          val context = serverContext.asInstanceOf[ThriftCLIServerContext]
          val sessionHandle = context.getSessionHandle
          if (sessionHandle != null) {
            info("Session disconnected without closing properly. ")
            try {
              val close = livyConf.getBoolean(LivyConf.THRIFT_CLOSE_SESSION_ON_DISCONNECT)
              info("Closing the session: " + sessionHandle)
              if (close) {
                cliService.closeSession(sessionHandle)
              }
            } catch {
              case e: HiveSQLException => warn("Failed to close session: " + e, e)
            }
          }
        }

        override def preServe(): Unit = {}

        override def processContext(
            serverContext: ServerContext,
            input: TTransport,
            output: TTransport): Unit = {
          currentServerContext.set(serverContext)
        }
      })
      info(s"Starting ${classOf[ThriftBinaryCLIService].getSimpleName} on port $portNum " +
        s"with $minWorkerThreads...$maxWorkerThreads worker threads")
    } catch {
      case e: Exception => throw new RuntimeException("Failed to init thrift server", e)
    }
  }