override def open()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala [70:104]


  override def open(parameters: Configuration): Unit = {
    val user: String = clickHouseConf.user
    val driver: String = clickHouseConf.driverClassName
    val properties = new ClickHouseProperties()
    (user, driver) match {
      case (u, d) if u != null && d != null =>
        Class.forName(d)
        properties.setUser(u)
      case (null, null) =>
      case (_, d) if d != null => Class.forName(d)
      case _ => properties.setUser(user)
    }
    // reflect set all properties...
    clickHouseConf.sinkOption
      .getInternalConfig()
      .foreach(
        x => {
          Try(Option(properties.getClass.getDeclaredField(x._1))).getOrElse(None) match {
            case Some(field) =>
              field.setAccessible(true)
              field.getType.getSimpleName match {
                case "String" => field.set(properties, x._2)
                case "int" | "Integer" => field.set(properties, x._2.toInt)
                case "long" | "Long" => field.set(properties, x._2.toLong)
                case "boolean" | "Boolean" => field.set(properties, x._2.toBoolean)
                case _ =>
              }
            case None =>
              logWarn(
                s"ClickHouseProperties config error,property:${x._1} invalid,please see ru.yandex.clickhouse.settings.ClickHouseProperties")
          }
        })
    val dataSource = new ClickHouseDataSource(clickHouseConf.jdbcUrl, properties)
    connection = dataSource.getConnection
  }