in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpCLIService.scala [59:157]
protected def initServer(): Unit = {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
val executorService = new ThreadPoolExecutorWithOomHook(
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue[Runnable],
new ThreadFactoryWithGarbageCleanup("LivyThriftserver-HttpHandler-Pool"),
oomHook)
val threadPool = new ExecutorThreadPool(executorService)
// HTTP Server
server = new Server(threadPool)
val conf = new HttpConfiguration
// Configure header size
val requestHeaderSize = livyConf.getInt(LivyConf.THRIFT_HTTP_REQUEST_HEADER_SIZE)
val responseHeaderSize = livyConf.getInt(LivyConf.THRIFT_HTTP_RESPONSE_HEADER_SIZE)
conf.setRequestHeaderSize(requestHeaderSize)
conf.setResponseHeaderSize(responseHeaderSize)
val http = new HttpConnectionFactory(conf)
val useSsl = livyConf.getBoolean(LivyConf.THRIFT_USE_SSL)
val schemeName = if (useSsl) "https" else "http"
// Change connector if SSL is used
val connector = if (useSsl) {
val keyStorePath = livyConf.get(LivyConf.SSL_KEYSTORE).trim
if (keyStorePath.isEmpty) {
throw new IllegalArgumentException(
s"${LivyConf.SSL_KEYSTORE.key} Not configured for SSL connection")
}
val keyStorePassword = getKeyStorePassword()
val keystoreType = livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)
val sslContextFactory = new SslContextFactory.Server();
val excludedProtocols = livyConf.get(LivyConf.THRIFT_SSL_PROTOCOL_BLACKLIST).split(",")
info(s"HTTP Server SSL: adding excluded protocols: $excludedProtocols")
sslContextFactory.addExcludeProtocols(excludedProtocols: _*)
info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
sslContextFactory.getExcludeProtocols)
sslContextFactory.setKeyStorePath(keyStorePath)
sslContextFactory.setKeyStorePassword(keyStorePassword)
sslContextFactory.setKeyStoreType(keystoreType)
new ServerConnector(server, sslContextFactory, http)
} else {
new ServerConnector(server, http)
}
connector.setPort(portNum)
// Linux: yes, Windows:no
connector.setReuseAddress(true)
val maxIdleTime = livyConf.getTimeAsMs(LivyConf.THRIFT_HTTP_MAX_IDLE_TIME).asInstanceOf[Int]
connector.setIdleTimeout(maxIdleTime)
server.addConnector(connector)
// Thrift configs
val processor = new TCLIService.Processor[TCLIService.Iface](this)
val protocolFactory = new TBinaryProtocol.Factory
// Set during the init phase of LivyThriftserver if auth mode is kerberos
// UGI for the livy/_HOST (kerberos) principal
val serviceUGI = cliService.getServiceUGI
// UGI for the http/_HOST (SPNego) principal
val httpUGI = cliService.getHttpUGI
val authType = livyConf.get(LivyConf.THRIFT_AUTHENTICATION)
val thriftHttpServlet = new ThriftHttpServlet(
processor,
protocolFactory,
authType,
serviceUGI,
httpUGI,
hiveAuthFactory,
livyConf)
// Context handler
val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
context.setContextPath("/")
if (livyConf.getBoolean(LivyConf.THRIFT_XSRF_FILTER_ENABLED)) {
// Filtering does not work here currently, doing filter in ThriftHttpServlet
debug("XSRF filter enabled")
} else {
warn("XSRF filter disabled")
}
val httpPath = getHttpPath(livyConf.get(LivyConf.THRIFT_HTTP_PATH))
if (livyConf.getBoolean(LivyConf.THRIFT_XSRF_FILTER_ENABLED)) {
val gzipHandler = new GzipHandler
gzipHandler.setHandler(context)
gzipHandler.addIncludedMethods(HttpMethod.POST)
gzipHandler.addIncludedMimeTypes(ThriftHttpCLIService.APPLICATION_THRIFT)
server.setHandler(gzipHandler)
} else {
server.setHandler(context)
}
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath)
// TODO: check defaults: maxTimeout, keepalive, maxBodySize,
// bodyRecieveDuration, etc.
// Finally, start the server
server.start()
info(s"Started ${classOf[ThriftHttpCLIService].getSimpleName} in $schemeName mode on port " +
s"$portNum path=$httpPath with $minWorkerThreads...$maxWorkerThreads worker threads")
} catch {
case e: Exception => throw new RuntimeException("Failed to init HttpServer", e)
}
}