private def getOrCreateImpalaClient()

in linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala [302:448]


  private def getOrCreateImpalaClient(
      engineExecutionContext: EngineExecutionContext
  ): ImpalaClient = {
    val userCreatorLabel =
      engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get
    val engineTypeLabel =
      engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
    var configMap: util.Map[String, String] = null
    if (userCreatorLabel != null && engineTypeLabel != null) {
      configMap = Utils.tryAndWarn(
        ImpalaEngineConfig.getCacheMap(
          (
            userCreatorLabel.asInstanceOf[UserCreatorLabel],
            engineTypeLabel.asInstanceOf[EngineTypeLabel]
          )
        )
      )
    }
    if (configMap == null) {
      configMap = new util.HashMap[String, String]()
    }

    val properties = engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
    if (MapUtils.isNotEmpty(properties)) {
      configMap.putAll(properties)
    }

    val impalaServers = IMPALA_SERVERS.getValue(configMap)
    val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
    val impalaSaslEnable = IMPALA_SASL_ENABLE.getValue(configMap)
    val impalaSaslProperties = IMPALA_SASL_PROPERTIES.getValue(configMap)
    val impalaSaslUsername = IMPALA_SASL_USERNAME.getValue(configMap)
    val impalaSaslPassword = IMPALA_SASL_PASSWORD.getValue(configMap)
    val impalaSaslPasswordCmd = IMPALA_SASL_PASSWORD_CMD.getValue(configMap)
    val impalaSaslMechanism = IMPALA_SASL_MECHANISM.getValue(configMap)
    val impalaSaslAuthorization = IMPALA_SASL_AUTHORIZATION_ID.getValue(configMap)
    val impalaSaslProtocol = IMPALA_SASL_PROTOCOL.getValue(configMap)
    val impalaHeartbeatSeconds = IMPALA_HEARTBEAT_SECONDS.getValue(configMap)
    val impalaQueryTimeoutSeconds = IMPALA_QUERY_TIMEOUT_SECONDS.getValue(configMap)
    val impalaQueryBatchSize = IMPALA_QUERY_BATCH_SIZE.getValue(configMap)
    val impalaQueryOptions = IMPALA_QUERY_OPTIONS.getValue(configMap)

    val impalaSslEnable = IMPALA_SSL_ENABLE.getValue(configMap)
    val impalaSslKeystore = IMPALA_SSL_KEYSTORE.getValue(configMap)
    val impalaSslKeystoreType = IMPALA_SSL_KEYSTORE_TYPE.getValue(configMap)
    val impalaSslKeystorePassword = IMPALA_SSL_KEYSTORE_PASSWORD.getValue(configMap)
    val impalaSslTruststore = IMPALA_SSL_TRUSTSTORE.getValue(configMap)
    val impalaSslTruststoreType = IMPALA_SSL_TRUSTSTORE_TYPE.getValue(configMap)
    val impalaSslTruststorePassword = IMPALA_SSL_TRUSTSTORE_PASSWORD.getValue(configMap)

    val impalaClientKey = Array(
      impalaServers,
      impalaMaxConnections,
      impalaSaslEnable,
      impalaSaslProperties,
      impalaSaslUsername,
      impalaSaslPassword,
      impalaSaslPasswordCmd,
      impalaSaslMechanism,
      impalaSaslAuthorization,
      impalaSaslProtocol,
      impalaHeartbeatSeconds,
      impalaQueryTimeoutSeconds,
      impalaQueryBatchSize,
      impalaQueryOptions,
      impalaSslEnable,
      impalaSslKeystore,
      impalaSslKeystoreType,
      impalaSslKeystorePassword,
      impalaSslTruststore,
      impalaSslTruststoreType,
      impalaSslTruststorePassword
    ).mkString("/")

    impalaClients.synchronized {
      var client = impalaClients.get(impalaClientKey)
      if (client == null) {
        val socketFactory = createSocketFactory(
          impalaSslEnable,
          impalaSslKeystore,
          impalaSslKeystoreType,
          impalaSslKeystorePassword,
          impalaSslTruststore,
          impalaSslTruststoreType,
          impalaSslTruststorePassword
        )

        val servers = impalaServers.split(',')
        val maxConnections = impalaMaxConnections
        val factory: ImpalaThriftSessionFactory = if (impalaSaslEnable) {
          val saslProperties: util.Map[String, String] = new util.TreeMap()
          Option(impalaSaslProperties)
            .map(_.split(','))
            .getOrElse(Array[String]())
            .foreach { str =>
              val kv = StringUtils.split(str, "=", 2)
              saslProperties.put(kv(0), if (kv.length > 1) kv(1) else "")
            }

          val password = impalaSaslPassword
          val passwordCmd = impalaSaslPasswordCmd
          var passwordCallback: PasswordCallback = null
          if (StringUtils.isNotBlank(passwordCmd)) {
            passwordCallback = new CommandPasswordCallback(passwordCmd);
          } else if (StringUtils.isNotBlank(password)) {
            passwordCallback = new StaticPasswordCallback(password);
          }

          val callbackHandler: CallbackHandler = new CallbackHandler() {
            override def handle(callbacks: Array[Callback]): Unit = callbacks.foreach {
              case callback: NameCallback => callback.setName(impalaSaslUsername)
              case callback: PasswordCallback => callback.setPassword(passwordCallback.getPassword)
            }
          }

          new ImpalaThriftSessionFactory(
            servers,
            maxConnections,
            socketFactory,
            impalaSaslMechanism,
            impalaSaslAuthorization,
            impalaSaslProtocol,
            saslProperties,
            callbackHandler
          )
        } else {
          new ImpalaThriftSessionFactory(servers, maxConnections, socketFactory)
        }

        val impalaClient = new ImpalaThriftClient(factory, impalaHeartbeatSeconds)
        impalaClient.setQueryTimeoutInSeconds(impalaQueryTimeoutSeconds)
        impalaClient.setBatchSize(impalaQueryBatchSize)
        Option(impalaQueryOptions)
          .map(_.split(','))
          .getOrElse(Array[String]())
          .foreach { str =>
            if (StringUtils.contains(str, "=")) {
              val kv = StringUtils.split(str, "=", 2)
              impalaClient.setQueryOption(kv(0), if (kv.length > 1) kv(1) else "")
            }
          }

        client = impalaClient
      }
      client
    }
  }