in streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala [33:95]
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
// httpClientConfigCallback and requestConfigCallback........
def configCallback(): RestClientBuilder = {
val userName = config.userName
val password = config.password
// userName,password must be all set,or all not set..
require(
(userName != null && password != null) || (userName == null && password == null),
"[StreamPark] elasticsearch auth info error,userName,password must be all set,or all not set."
)
val credentialsProvider = (userName, password) match {
case (null, null) => null
case _ =>
val credentialsProvider: CredentialsProvider = new BasicCredentialsProvider()
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(userName, password))
credentialsProvider
}
val httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback {
override def customizeHttpClient(
httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
logInfo("elasticsearch auth by userName,password...")
}
httpClientBuilder
}
}
val requestConfigCallback = new RestClientBuilder.RequestConfigCallback {
override def customizeRequestConfig(
requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {
if (credentialsProvider != null) {
requestConfigBuilder.setAuthenticationEnabled(true)
}
requestConfigBuilder.setConnectionRequestTimeout(config.connectRequestTimeout)
requestConfigBuilder.setConnectTimeout(config.connectTimeout)
requestConfigBuilder.setMaxRedirects(config.maxRedirects)
requestConfigBuilder.setRedirectsEnabled(config.redirectsEnabled)
requestConfigBuilder.setConnectTimeout(config.socketTimeout)
requestConfigBuilder.setRelativeRedirectsAllowed(config.relativeRedirectsAllowed)
requestConfigBuilder.setContentCompressionEnabled(config.contentCompressionEnabled)
requestConfigBuilder.setNormalizeUri(config.normalizeUri)
}
}
restClientBuilder.setHttpClientConfigCallback(httpClientConfigCallback)
restClientBuilder.setRequestConfigCallback(requestConfigCallback)
}
def setHeader(): RestClientBuilder = {
val headers = new BasicHeader("Content-Type", config.contentType)
restClientBuilder.setDefaultHeaders(Array(headers))
config.pathPrefix match {
case null => null
case path => restClientBuilder.setPathPrefix(path)
}
}
configCallback()
setHeader()
}