in thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftBinaryCLIService.scala [47:157]
protected def initServer(): Unit = {
try {
// Server thread pool
val executorService = new ThreadPoolExecutorWithOomHook(
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue[Runnable],
new ThreadFactoryWithGarbageCleanup("LivyThriftserver-Handler-Pool"),
oomHook)
// Thrift configs
val transportFactory: TTransportFactory = hiveAuthFactory.getAuthTransFactory
val processorFactory: TProcessorFactory = hiveAuthFactory.getAuthProcFactory(this)
var serverSocket: TServerSocket = null
val serverAddress = if (hiveHost == null || hiveHost.isEmpty) {
new InetSocketAddress(portNum) // Wildcard bind
} else {
new InetSocketAddress(hiveHost, portNum)
}
if (!livyConf.getBoolean(LivyConf.THRIFT_USE_SSL)) {
serverSocket = new TServerSocket(serverAddress)
} else {
val sslVersionBlacklist = new util.ArrayList[String]
livyConf.get(LivyConf.THRIFT_SSL_PROTOCOL_BLACKLIST).split(",").foreach { sslVersion =>
sslVersionBlacklist.add(sslVersion.trim.toLowerCase)
}
val keyStorePath = livyConf.get(LivyConf.SSL_KEYSTORE).trim
if (keyStorePath.isEmpty) {
throw new IllegalArgumentException(
s"${LivyConf.SSL_KEYSTORE.key} Not configured for SSL connection")
}
val keyStorePassword = getKeyStorePassword()
val keystoreType = livyConf.get(LivyConf.SSL_KEYSTORE_TYPE)
val params = new TSSLTransportFactory.TSSLTransportParameters
params.setKeyStore(keyStorePath, keyStorePassword, null, keystoreType)
serverSocket =
TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress.getAddress, params)
if (serverSocket.getServerSocket.isInstanceOf[SSLServerSocket]) {
val sslServerSocket = serverSocket.getServerSocket.asInstanceOf[SSLServerSocket]
val enabledProtocols = sslServerSocket.getEnabledProtocols.filter { protocol =>
if (sslVersionBlacklist.contains(protocol.toLowerCase)) {
debug(s"Disabling SSL Protocol: $protocol")
false
} else {
true
}
}
sslServerSocket.setEnabledProtocols(enabledProtocols)
info(s"SSL Server Socket Enabled Protocols: ${sslServerSocket.getEnabledProtocols}")
}
}
// Server args
val maxMessageSize = livyConf.getInt(LivyConf.THRIFT_MAX_MESSAGE_SIZE)
val requestTimeout =
livyConf.getTimeAsMs(LivyConf.THRIFT_LOGIN_TIMEOUT).asInstanceOf[Int]
val beBackoffSlotLength =
livyConf.getTimeAsMs(LivyConf.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH).asInstanceOf[Int]
val sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory)
.transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory)
.inputProtocolFactory(
new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout)
.requestTimeoutUnit(TimeUnit.MILLISECONDS)
.beBackoffSlotLength(beBackoffSlotLength)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService)
// TCP Server
server = new TThreadPoolServer(sargs)
server.setServerEventHandler(new TServerEventHandler() {
override def createContext(input: TProtocol, output: TProtocol): ServerContext = {
new ThriftCLIServerContext
}
override def deleteContext(
serverContext: ServerContext,
input: TProtocol,
output: TProtocol): Unit = {
val context = serverContext.asInstanceOf[ThriftCLIServerContext]
val sessionHandle = context.getSessionHandle
if (sessionHandle != null) {
info("Session disconnected without closing properly. ")
try {
val close = livyConf.getBoolean(LivyConf.THRIFT_CLOSE_SESSION_ON_DISCONNECT)
info("Closing the session: " + sessionHandle)
if (close) {
cliService.closeSession(sessionHandle)
}
} catch {
case e: HiveSQLException => warn("Failed to close session: " + e, e)
}
}
}
override def preServe(): Unit = {}
override def processContext(
serverContext: ServerContext,
input: TTransport,
output: TTransport): Unit = {
currentServerContext.set(serverContext)
}
})
info(s"Starting ${classOf[ThriftBinaryCLIService].getSimpleName} on port $portNum " +
s"with $minWorkerThreads...$maxWorkerThreads worker threads")
} catch {
case e: Exception => throw new RuntimeException("Failed to init thrift server", e)
}
}