override def configureRestClientBuilder()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala [33:95]


  override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
    // httpClientConfigCallback and requestConfigCallback........
    def configCallback(): RestClientBuilder = {
      val userName = config.userName
      val password = config.password
      // userName,password must be all set,or all not set..
      require(
        (userName != null && password != null) || (userName == null && password == null),
        "[StreamPark] elasticsearch auth info error,userName,password must be all set,or all not set."
      )
      val credentialsProvider = (userName, password) match {
        case (null, null) => null
        case _ =>
          val credentialsProvider: CredentialsProvider = new BasicCredentialsProvider()
          credentialsProvider.setCredentials(
            AuthScope.ANY,
            new UsernamePasswordCredentials(userName, password))
          credentialsProvider
      }

      val httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback {
        override def customizeHttpClient(
            httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
          if (credentialsProvider != null) {
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
            logInfo("elasticsearch auth by userName,password...")
          }
          httpClientBuilder
        }
      }

      val requestConfigCallback = new RestClientBuilder.RequestConfigCallback {
        override def customizeRequestConfig(
            requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {
          if (credentialsProvider != null) {
            requestConfigBuilder.setAuthenticationEnabled(true)
          }
          requestConfigBuilder.setConnectionRequestTimeout(config.connectRequestTimeout)
          requestConfigBuilder.setConnectTimeout(config.connectTimeout)
          requestConfigBuilder.setMaxRedirects(config.maxRedirects)
          requestConfigBuilder.setRedirectsEnabled(config.redirectsEnabled)
          requestConfigBuilder.setConnectTimeout(config.socketTimeout)
          requestConfigBuilder.setRelativeRedirectsAllowed(config.relativeRedirectsAllowed)
          requestConfigBuilder.setContentCompressionEnabled(config.contentCompressionEnabled)
          requestConfigBuilder.setNormalizeUri(config.normalizeUri)
        }
      }
      restClientBuilder.setHttpClientConfigCallback(httpClientConfigCallback)
      restClientBuilder.setRequestConfigCallback(requestConfigCallback)
    }

    def setHeader(): RestClientBuilder = {
      val headers = new BasicHeader("Content-Type", config.contentType)
      restClientBuilder.setDefaultHeaders(Array(headers))
      config.pathPrefix match {
        case null => null
        case path => restClientBuilder.setPathPrefix(path)
      }
    }

    configCallback()
    setHeader()
  }