def getClient()

in streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala [74:160]


  def getClient(properties: Properties)(implicit alias: String = ""): MongoClient = {
    val mongoParam = getProperties(properties)
    if (mongoParam.containsKey(client_uri)) {
      val clientURI = new MongoClientURI(mongoParam(client_uri))
      new MongoClient(clientURI)
    } else {
      // 客户端配置(连接数、副本集群验证)
      val builder = new MongoClientOptions.Builder
      if (mongoParam.containsKey(max_connections_per_host)) {
        builder.connectionsPerHost(mongoParam(max_connections_per_host).toInt)
      }
      if (mongoParam.containsKey(min_connections_per_host)) {
        builder.minConnectionsPerHost(mongoParam(min_connections_per_host).toInt)
      }
      if (mongoParam.containsKey(replica_set)) {
        builder.requiredReplicaSetName(mongoParam(replica_set))
      }
      if (mongoParam.containsKey(threads_allowed_to_block_for_connection_multiplier)) {
        builder.threadsAllowedToBlockForConnectionMultiplier(
          mongoParam(threads_allowed_to_block_for_connection_multiplier).toInt)
      }
      if (mongoParam.containsKey(server_selection_timeout)) {
        builder.serverSelectionTimeout(mongoParam(server_selection_timeout).toInt)
      }
      if (mongoParam.containsKey(max_wait_time)) {
        builder.maxWaitTime(mongoParam(max_wait_time).toInt)
      }
      if (mongoParam.containsKey(max_connection_idel_time)) {
        builder.maxConnectionIdleTime(mongoParam(max_connection_idel_time).toInt)
      }
      if (mongoParam.containsKey(max_connection_life_time)) {
        builder.maxConnectionLifeTime(mongoParam(max_connection_life_time).toInt)
      }
      if (mongoParam.containsKey(connect_timeout)) {
        builder.connectTimeout(mongoParam(connect_timeout).toInt)
      }
      if (mongoParam.containsKey(socket_timeout)) {
        builder.socketTimeout(mongoParam(socket_timeout).toInt)
      }
      if (mongoParam.containsKey(ssl_enabled)) {
        builder.sslEnabled(mongoParam(ssl_enabled).toBoolean)
      }
      if (mongoParam.containsKey(ssl_invalid_host_name_allowed)) {
        builder.sslInvalidHostNameAllowed(mongoParam(ssl_invalid_host_name_allowed).toBoolean)
      }
      if (mongoParam.containsKey(always_use_m_beans)) {
        builder.alwaysUseMBeans(mongoParam(always_use_m_beans).toBoolean)
      }
      if (mongoParam.containsKey(heartbeat_frequency)) {
        builder.heartbeatFrequency(mongoParam(heartbeat_frequency).toInt)
      }
      if (mongoParam.containsKey(min_heartbeat_frequency)) {
        builder.minHeartbeatFrequency(mongoParam(min_heartbeat_frequency).toInt)
      }
      if (mongoParam.containsKey(heartbeat_connect_timeout)) {
        builder.heartbeatConnectTimeout(mongoParam(heartbeat_connect_timeout).toInt)
      }
      if (mongoParam.containsKey(heartbeat_socket_timeout)) {
        builder.heartbeatSocketTimeout(mongoParam(heartbeat_socket_timeout).toInt)
      }
      if (mongoParam.containsKey(local_threshold)) {
        builder.localThreshold(mongoParam(local_threshold).toInt)
      }
      val mongoClientOptions = builder.build
      val serverAddresses = mongoParam(address)
        .split(",")
        .map(
          x => {
            val hostAndPort = x.split(":")
            val host = hostAndPort.head
            val port = hostAndPort(1).toInt
            new ServerAddress(host, port)
          })
      if (mongoParam.containsKey(username)) {
        val db =
          if (mongoParam.containsKey(authentication_database)) mongoParam(authentication_database)
          else mongoParam(database)
        val mongoCredential = MongoCredential.createScramSha1Credential(
          mongoParam(username),
          db,
          mongoParam(password).toCharArray)
        new MongoClient(serverAddresses.toList, List(mongoCredential), mongoClientOptions)
      } else {
        new MongoClient(serverAddresses.toList, mongoClientOptions)
      }
    }
  }