in streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala [70:104]
override def open(parameters: Configuration): Unit = {
val user: String = clickHouseConf.user
val driver: String = clickHouseConf.driverClassName
val properties = new ClickHouseProperties()
(user, driver) match {
case (u, d) if u != null && d != null =>
Class.forName(d)
properties.setUser(u)
case (null, null) =>
case (_, d) if d != null => Class.forName(d)
case _ => properties.setUser(user)
}
// reflect set all properties...
clickHouseConf.sinkOption
.getInternalConfig()
.foreach(
x => {
Try(Option(properties.getClass.getDeclaredField(x._1))).getOrElse(None) match {
case Some(field) =>
field.setAccessible(true)
field.getType.getSimpleName match {
case "String" => field.set(properties, x._2)
case "int" | "Integer" => field.set(properties, x._2.toInt)
case "long" | "Long" => field.set(properties, x._2.toLong)
case "boolean" | "Boolean" => field.set(properties, x._2.toBoolean)
case _ =>
}
case None =>
logWarn(
s"ClickHouseProperties config error,property:${x._1} invalid,please see ru.yandex.clickhouse.settings.ClickHouseProperties")
}
})
val dataSource = new ClickHouseDataSource(clickHouseConf.jdbcUrl, properties)
connection = dataSource.getConnection
}