in linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala [302:448]
private def getOrCreateImpalaClient(
engineExecutionContext: EngineExecutionContext
): ImpalaClient = {
val userCreatorLabel =
engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get
val engineTypeLabel =
engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
var configMap: util.Map[String, String] = null
if (userCreatorLabel != null && engineTypeLabel != null) {
configMap = Utils.tryAndWarn(
ImpalaEngineConfig.getCacheMap(
(
userCreatorLabel.asInstanceOf[UserCreatorLabel],
engineTypeLabel.asInstanceOf[EngineTypeLabel]
)
)
)
}
if (configMap == null) {
configMap = new util.HashMap[String, String]()
}
val properties = engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
if (MapUtils.isNotEmpty(properties)) {
configMap.putAll(properties)
}
val impalaServers = IMPALA_SERVERS.getValue(configMap)
val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
val impalaSaslEnable = IMPALA_SASL_ENABLE.getValue(configMap)
val impalaSaslProperties = IMPALA_SASL_PROPERTIES.getValue(configMap)
val impalaSaslUsername = IMPALA_SASL_USERNAME.getValue(configMap)
val impalaSaslPassword = IMPALA_SASL_PASSWORD.getValue(configMap)
val impalaSaslPasswordCmd = IMPALA_SASL_PASSWORD_CMD.getValue(configMap)
val impalaSaslMechanism = IMPALA_SASL_MECHANISM.getValue(configMap)
val impalaSaslAuthorization = IMPALA_SASL_AUTHORIZATION_ID.getValue(configMap)
val impalaSaslProtocol = IMPALA_SASL_PROTOCOL.getValue(configMap)
val impalaHeartbeatSeconds = IMPALA_HEARTBEAT_SECONDS.getValue(configMap)
val impalaQueryTimeoutSeconds = IMPALA_QUERY_TIMEOUT_SECONDS.getValue(configMap)
val impalaQueryBatchSize = IMPALA_QUERY_BATCH_SIZE.getValue(configMap)
val impalaQueryOptions = IMPALA_QUERY_OPTIONS.getValue(configMap)
val impalaSslEnable = IMPALA_SSL_ENABLE.getValue(configMap)
val impalaSslKeystore = IMPALA_SSL_KEYSTORE.getValue(configMap)
val impalaSslKeystoreType = IMPALA_SSL_KEYSTORE_TYPE.getValue(configMap)
val impalaSslKeystorePassword = IMPALA_SSL_KEYSTORE_PASSWORD.getValue(configMap)
val impalaSslTruststore = IMPALA_SSL_TRUSTSTORE.getValue(configMap)
val impalaSslTruststoreType = IMPALA_SSL_TRUSTSTORE_TYPE.getValue(configMap)
val impalaSslTruststorePassword = IMPALA_SSL_TRUSTSTORE_PASSWORD.getValue(configMap)
val impalaClientKey = Array(
impalaServers,
impalaMaxConnections,
impalaSaslEnable,
impalaSaslProperties,
impalaSaslUsername,
impalaSaslPassword,
impalaSaslPasswordCmd,
impalaSaslMechanism,
impalaSaslAuthorization,
impalaSaslProtocol,
impalaHeartbeatSeconds,
impalaQueryTimeoutSeconds,
impalaQueryBatchSize,
impalaQueryOptions,
impalaSslEnable,
impalaSslKeystore,
impalaSslKeystoreType,
impalaSslKeystorePassword,
impalaSslTruststore,
impalaSslTruststoreType,
impalaSslTruststorePassword
).mkString("/")
impalaClients.synchronized {
var client = impalaClients.get(impalaClientKey)
if (client == null) {
val socketFactory = createSocketFactory(
impalaSslEnable,
impalaSslKeystore,
impalaSslKeystoreType,
impalaSslKeystorePassword,
impalaSslTruststore,
impalaSslTruststoreType,
impalaSslTruststorePassword
)
val servers = impalaServers.split(',')
val maxConnections = impalaMaxConnections
val factory: ImpalaThriftSessionFactory = if (impalaSaslEnable) {
val saslProperties: util.Map[String, String] = new util.TreeMap()
Option(impalaSaslProperties)
.map(_.split(','))
.getOrElse(Array[String]())
.foreach { str =>
val kv = StringUtils.split(str, "=", 2)
saslProperties.put(kv(0), if (kv.length > 1) kv(1) else "")
}
val password = impalaSaslPassword
val passwordCmd = impalaSaslPasswordCmd
var passwordCallback: PasswordCallback = null
if (StringUtils.isNotBlank(passwordCmd)) {
passwordCallback = new CommandPasswordCallback(passwordCmd);
} else if (StringUtils.isNotBlank(password)) {
passwordCallback = new StaticPasswordCallback(password);
}
val callbackHandler: CallbackHandler = new CallbackHandler() {
override def handle(callbacks: Array[Callback]): Unit = callbacks.foreach {
case callback: NameCallback => callback.setName(impalaSaslUsername)
case callback: PasswordCallback => callback.setPassword(passwordCallback.getPassword)
}
}
new ImpalaThriftSessionFactory(
servers,
maxConnections,
socketFactory,
impalaSaslMechanism,
impalaSaslAuthorization,
impalaSaslProtocol,
saslProperties,
callbackHandler
)
} else {
new ImpalaThriftSessionFactory(servers, maxConnections, socketFactory)
}
val impalaClient = new ImpalaThriftClient(factory, impalaHeartbeatSeconds)
impalaClient.setQueryTimeoutInSeconds(impalaQueryTimeoutSeconds)
impalaClient.setBatchSize(impalaQueryBatchSize)
Option(impalaQueryOptions)
.map(_.split(','))
.getOrElse(Array[String]())
.foreach { str =>
if (StringUtils.contains(str, "=")) {
val kv = StringUtils.split(str, "=", 2)
impalaClient.setQueryOption(kv(0), if (kv.length > 1) kv(1) else "")
}
}
client = impalaClient
}
client
}
}