override protected def hadoopConf: Configuration = KyuubiServer.getHadoopConf()

in kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala [77:233]


  override protected def hadoopConf: Configuration = KyuubiServer.getHadoopConf()

  private lazy val defaultFetchSize = conf.get(KYUUBI_SERVER_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)

  /**
   * Configure Jetty to serve http requests. Example of a client connection URL:
   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target
   * URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/.
   *
   * @param conf the configuration of the service
   */
  override def initialize(conf: KyuubiConf): Unit = synchronized {
    this.conf = conf
    if (authFactory.kerberosEnabled && authFactory.effectivePlainAuthType.isEmpty) {
      throw new AuthenticationException("Kerberos is not supported for Thrift HTTP mode")
    }

    try {
      // Server thread pool
      // Start with minWorkerThreads, expand till maxWorkerThreads and reject
      // subsequent requests
      val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
      val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
      val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
      val executor = new ThreadPoolExecutor(
        minThreads,
        maxThreads,
        keepAliveTime,
        TimeUnit.MILLISECONDS,
        new SynchronousQueue[Runnable](),
        new NamedThreadFactory(getName + "HttpHandler-Pool", false))
      val threadPool = new ExecutorThreadPool(executor)

      // HTTP Server
      server = Some(new Server(threadPool))

      val httpConf = new HttpConfiguration
      // Configure header size
      val requestHeaderSize = conf.get(FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE)
      val responseHeaderSize = conf.get(FRONTEND_THRIFT_HTTP_RESPONSE_HEADER_SIZE)
      val jettySendVersionEnabled = conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED)
      httpConf.setRequestHeaderSize(requestHeaderSize)
      httpConf.setResponseHeaderSize(responseHeaderSize)
      httpConf.setSendServerVersion(jettySendVersionEnabled)
      val connectionFactory = new HttpConnectionFactory(httpConf)

      val useSsl = conf.get(FRONTEND_THRIFT_HTTP_USE_SSL)
      val schemeName = if (useSsl) "https" else "http"

      // Change connector if SSL is used
      val connector =
        if (useSsl) {
          keyStorePath = conf.get(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH)

          if (keyStorePath.isEmpty) {
            throw new IllegalArgumentException(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH.key +
              " Not configured for SSL connection, please set the key with: " +
              FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH.doc)
          }

          keyStorePassword = conf.get(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD)
          if (keyStorePassword.isEmpty) {
            throw new IllegalArgumentException(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD.key +
              " Not configured for SSL connection. please set the key with: " +
              FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD.doc)
          }

          val sslContextFactory = new SslContextFactory.Server
          val excludedProtocols = conf.get(FRONTEND_THRIFT_HTTP_SSL_PROTOCOL_BLACKLIST)
          val excludeCipherSuites = conf.get(FRONTEND_THRIFT_HTTP_SSL_EXCLUDE_CIPHER_SUITES)
          keyStoreType = conf.get(FRONTEND_SSL_KEYSTORE_TYPE)
          val keyStoreAlgorithm = conf.get(FRONTEND_SSL_KEYSTORE_ALGORITHM)
          info("Thrift HTTP Server SSL: adding excluded protocols: " +
            String.join(",", excludedProtocols: _*))
          sslContextFactory.addExcludeProtocols(excludedProtocols: _*)
          info("Thrift HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
            String.join(",", sslContextFactory.getExcludeProtocols: _*))
          info("Thrift HTTP Server SSL: setting excluded cipher Suites: " +
            String.join(",", excludeCipherSuites: _*))
          sslContextFactory.setExcludeCipherSuites(excludeCipherSuites: _*)
          info("Thrift HTTP Server SSL: SslContextFactory.getExcludeCipherSuites = " +
            String.join(",", sslContextFactory.getExcludeCipherSuites: _*))
          sslContextFactory.setKeyStorePath(keyStorePath.get)
          sslContextFactory.setKeyStorePassword(keyStorePassword.get)
          keyStoreType.foreach(sslContextFactory.setKeyStoreType)
          keyStoreAlgorithm.foreach(sslContextFactory.setKeyManagerFactoryAlgorithm)
          new ServerConnector(
            server.get,
            sslContextFactory,
            connectionFactory)
        } else {
          new ServerConnector(server.get, connectionFactory)
        }

      connector.setPort(portNum)
      // Linux:yes, Windows:no
      // result of setting the SO_REUSEADDR flag is different on Windows
      // http://msdn.microsoft.com/en-us/library/ms740621(v=vs.85).aspx
      // without this 2 NN's can start on the same machine and listen on
      // the same port with indeterminate routing of incoming requests to them
      connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
      val maxIdleTime = conf.get(FRONTEND_THRIFT_HTTP_MAX_IDLE_TIME)
      connector.setIdleTimeout(maxIdleTime)
      connector.setAcceptQueueSize(maxThreads)
      server.foreach(_.addConnector(connector))

      val processor = new TCLIService.Processor[TCLIService.Iface](this)
      val protocolFactory = new TBinaryProtocol.Factory
      val servlet = new ThriftHttpServlet(processor, protocolFactory, authFactory, conf)
      servlet.init()

      // Context handler
      val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
      context.setContextPath("/")

      context.addEventListener(new ServletContextListener() {
        override def contextInitialized(servletContextEvent: ServletContextEvent): Unit = {
          MetricsSystem.tracing { ms =>
            ms.incCount(THRIFT_HTTP_CONN_TOTAL)
            ms.incCount(THRIFT_HTTP_CONN_OPEN)
          }
        }

        override def contextDestroyed(servletContextEvent: ServletContextEvent): Unit = {
          MetricsSystem.tracing { ms =>
            ms.decCount(THRIFT_HTTP_CONN_OPEN)
          }
        }
      })

      val httpPath = getHttpPath(conf.get(FRONTEND_THRIFT_HTTP_PATH))

      if (conf.get(FRONTEND_THRIFT_HTTP_COMPRESSION_ENABLED)) {
        val gzipHandler = new GzipHandler
        gzipHandler.setHandler(context)
        gzipHandler.addIncludedMethods(HttpMethod.POST.asString())
        gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT)
        server.foreach(_.setHandler(gzipHandler))
      } else {
        server.foreach(_.setHandler(context))
      }

      context.addServlet(new ServletHolder(servlet), httpPath)
      constrainHttpMethods(context)

      info(s"Started ${getClass.getSimpleName} in $schemeName mode on port $portNum " +
        s"path=$httpPath with $minThreads ... $maxThreads threads")
    } catch {
      case e: Throwable =>
        MetricsSystem.tracing(_.incCount(THRIFT_HTTP_CONN_FAIL))
        error(e)
        throw new KyuubiException(
          s"Failed to initialize frontend service on $serverAddr:$portNum.",
          e)
    }
    super.initialize(conf)
  }