in streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala [74:162]
def getClient(properties: Properties)(implicit alias: String = ""): MongoClient = {
val mongoParam = getProperties(properties)
if (mongoParam.containsKey(client_uri)) {
val clientURI = new MongoClientURI(mongoParam(client_uri))
new MongoClient(clientURI)
} else {
// 客户端配置(连接数、副本集群验证)
val builder = new MongoClientOptions.Builder
if (mongoParam.containsKey(max_connections_per_host)) {
builder.connectionsPerHost(mongoParam(max_connections_per_host).toInt)
}
if (mongoParam.containsKey(min_connections_per_host)) {
builder.minConnectionsPerHost(mongoParam(min_connections_per_host).toInt)
}
if (mongoParam.containsKey(replica_set)) {
builder.requiredReplicaSetName(mongoParam(replica_set))
}
if (mongoParam.containsKey(threads_allowed_to_block_for_connection_multiplier)) {
builder.threadsAllowedToBlockForConnectionMultiplier(
mongoParam(threads_allowed_to_block_for_connection_multiplier).toInt)
}
if (mongoParam.containsKey(server_selection_timeout)) {
builder.serverSelectionTimeout(mongoParam(server_selection_timeout).toInt)
}
if (mongoParam.containsKey(max_wait_time)) {
builder.maxWaitTime(mongoParam(max_wait_time).toInt)
}
if (mongoParam.containsKey(max_connection_idel_time)) {
builder.maxConnectionIdleTime(mongoParam(max_connection_idel_time).toInt)
}
if (mongoParam.containsKey(max_connection_life_time)) {
builder.maxConnectionLifeTime(mongoParam(max_connection_life_time).toInt)
}
if (mongoParam.containsKey(connect_timeout)) {
builder.connectTimeout(mongoParam(connect_timeout).toInt)
}
if (mongoParam.containsKey(socket_timeout)) {
builder.socketTimeout(mongoParam(socket_timeout).toInt)
}
if (mongoParam.containsKey(ssl_enabled)) {
builder.sslEnabled(mongoParam(ssl_enabled).toBoolean)
}
if (mongoParam.containsKey(ssl_invalid_host_name_allowed)) {
builder.sslInvalidHostNameAllowed(mongoParam(ssl_invalid_host_name_allowed).toBoolean)
}
if (mongoParam.containsKey(always_use_m_beans)) {
builder.alwaysUseMBeans(mongoParam(always_use_m_beans).toBoolean)
}
if (mongoParam.containsKey(heartbeat_frequency)) {
builder.heartbeatFrequency(mongoParam(heartbeat_frequency).toInt)
}
if (mongoParam.containsKey(min_heartbeat_frequency)) {
builder.minHeartbeatFrequency(mongoParam(min_heartbeat_frequency).toInt)
}
if (mongoParam.containsKey(heartbeat_connect_timeout)) {
builder.heartbeatConnectTimeout(mongoParam(heartbeat_connect_timeout).toInt)
}
if (mongoParam.containsKey(heartbeat_socket_timeout)) {
builder.heartbeatSocketTimeout(mongoParam(heartbeat_socket_timeout).toInt)
}
if (mongoParam.containsKey(local_threshold)) {
builder.localThreshold(mongoParam(local_threshold).toInt)
}
val mongoClientOptions = builder.build
val serverAddresses = mongoParam(address)
.split(",")
.map(x => {
val hostAndPort = x.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
new ServerAddress(host, port)
})
if (mongoParam.containsKey(username)) {
val db =
if (mongoParam.containsKey(authentication_database)) {
mongoParam(authentication_database)
} else {
mongoParam(database)
}
val mongoCredential = MongoCredential.createScramSha1Credential(
mongoParam(username),
db,
mongoParam(password).toCharArray)
new MongoClient(serverAddresses.toList, List(mongoCredential), mongoClientOptions)
} else {
new MongoClient(serverAddresses.toList, mongoClientOptions)
}
}
}