in kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala [77:233]
override protected def hadoopConf: Configuration = KyuubiServer.getHadoopConf()
private lazy val defaultFetchSize = conf.get(KYUUBI_SERVER_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)
/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target
* URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/.
*
* @param conf the configuration of the service
*/
override def initialize(conf: KyuubiConf): Unit = synchronized {
this.conf = conf
if (authFactory.kerberosEnabled && authFactory.effectivePlainAuthType.isEmpty) {
throw new AuthenticationException("Kerberos is not supported for Thrift HTTP mode")
}
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject
// subsequent requests
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(getName + "HttpHandler-Pool", false))
val threadPool = new ExecutorThreadPool(executor)
// HTTP Server
server = Some(new Server(threadPool))
val httpConf = new HttpConfiguration
// Configure header size
val requestHeaderSize = conf.get(FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE)
val responseHeaderSize = conf.get(FRONTEND_THRIFT_HTTP_RESPONSE_HEADER_SIZE)
val jettySendVersionEnabled = conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED)
httpConf.setRequestHeaderSize(requestHeaderSize)
httpConf.setResponseHeaderSize(responseHeaderSize)
httpConf.setSendServerVersion(jettySendVersionEnabled)
val connectionFactory = new HttpConnectionFactory(httpConf)
val useSsl = conf.get(FRONTEND_THRIFT_HTTP_USE_SSL)
val schemeName = if (useSsl) "https" else "http"
// Change connector if SSL is used
val connector =
if (useSsl) {
keyStorePath = conf.get(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH)
if (keyStorePath.isEmpty) {
throw new IllegalArgumentException(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH.key +
" Not configured for SSL connection, please set the key with: " +
FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PATH.doc)
}
keyStorePassword = conf.get(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD)
if (keyStorePassword.isEmpty) {
throw new IllegalArgumentException(FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD.key +
" Not configured for SSL connection. please set the key with: " +
FRONTEND_THRIFT_HTTP_SSL_KEYSTORE_PASSWORD.doc)
}
val sslContextFactory = new SslContextFactory.Server
val excludedProtocols = conf.get(FRONTEND_THRIFT_HTTP_SSL_PROTOCOL_BLACKLIST)
val excludeCipherSuites = conf.get(FRONTEND_THRIFT_HTTP_SSL_EXCLUDE_CIPHER_SUITES)
keyStoreType = conf.get(FRONTEND_SSL_KEYSTORE_TYPE)
val keyStoreAlgorithm = conf.get(FRONTEND_SSL_KEYSTORE_ALGORITHM)
info("Thrift HTTP Server SSL: adding excluded protocols: " +
String.join(",", excludedProtocols: _*))
sslContextFactory.addExcludeProtocols(excludedProtocols: _*)
info("Thrift HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
String.join(",", sslContextFactory.getExcludeProtocols: _*))
info("Thrift HTTP Server SSL: setting excluded cipher Suites: " +
String.join(",", excludeCipherSuites: _*))
sslContextFactory.setExcludeCipherSuites(excludeCipherSuites: _*)
info("Thrift HTTP Server SSL: SslContextFactory.getExcludeCipherSuites = " +
String.join(",", sslContextFactory.getExcludeCipherSuites: _*))
sslContextFactory.setKeyStorePath(keyStorePath.get)
sslContextFactory.setKeyStorePassword(keyStorePassword.get)
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
keyStoreAlgorithm.foreach(sslContextFactory.setKeyManagerFactoryAlgorithm)
new ServerConnector(
server.get,
sslContextFactory,
connectionFactory)
} else {
new ServerConnector(server.get, connectionFactory)
}
connector.setPort(portNum)
// Linux:yes, Windows:no
// result of setting the SO_REUSEADDR flag is different on Windows
// http://msdn.microsoft.com/en-us/library/ms740621(v=vs.85).aspx
// without this 2 NN's can start on the same machine and listen on
// the same port with indeterminate routing of incoming requests to them
connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
val maxIdleTime = conf.get(FRONTEND_THRIFT_HTTP_MAX_IDLE_TIME)
connector.setIdleTimeout(maxIdleTime)
connector.setAcceptQueueSize(maxThreads)
server.foreach(_.addConnector(connector))
val processor = new TCLIService.Processor[TCLIService.Iface](this)
val protocolFactory = new TBinaryProtocol.Factory
val servlet = new ThriftHttpServlet(processor, protocolFactory, authFactory, conf)
servlet.init()
// Context handler
val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
context.setContextPath("/")
context.addEventListener(new ServletContextListener() {
override def contextInitialized(servletContextEvent: ServletContextEvent): Unit = {
MetricsSystem.tracing { ms =>
ms.incCount(THRIFT_HTTP_CONN_TOTAL)
ms.incCount(THRIFT_HTTP_CONN_OPEN)
}
}
override def contextDestroyed(servletContextEvent: ServletContextEvent): Unit = {
MetricsSystem.tracing { ms =>
ms.decCount(THRIFT_HTTP_CONN_OPEN)
}
}
})
val httpPath = getHttpPath(conf.get(FRONTEND_THRIFT_HTTP_PATH))
if (conf.get(FRONTEND_THRIFT_HTTP_COMPRESSION_ENABLED)) {
val gzipHandler = new GzipHandler
gzipHandler.setHandler(context)
gzipHandler.addIncludedMethods(HttpMethod.POST.asString())
gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT)
server.foreach(_.setHandler(gzipHandler))
} else {
server.foreach(_.setHandler(context))
}
context.addServlet(new ServletHolder(servlet), httpPath)
constrainHttpMethods(context)
info(s"Started ${getClass.getSimpleName} in $schemeName mode on port $portNum " +
s"path=$httpPath with $minThreads ... $maxThreads threads")
} catch {
case e: Throwable =>
MetricsSystem.tracing(_.incCount(THRIFT_HTTP_CONN_FAIL))
error(e)
throw new KyuubiException(
s"Failed to initialize frontend service on $serverAddr:$portNum.",
e)
}
super.initialize(conf)
}