in streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala [204:269]
def getConnection(prop: Properties): Connection = {
val alias = prop(KEY_ALIAS)
val lock = lockMap.getOrElseUpdate(alias, new ReentrantLock())
try {
lock.lock()
val ds: HikariDataSource =
Try(Option(dataSourceHolder(alias))).getOrElse(None) match {
case None =>
val jdbcConfig = new HikariConfig()
prop
.filter(x => x._1 != KEY_ALIAS && x._1 != KEY_SEMANTIC)
.foreach(x => {
Try(Option(jdbcConfig.getClass.getDeclaredField(x._1)))
.getOrElse(None) match {
case Some(field) =>
field.setAccessible(true)
field.getType.getSimpleName match {
case "String" =>
field.set(jdbcConfig, x._2.asInstanceOf[Object])
case "int" =>
field.set(jdbcConfig, x._2.toInt.asInstanceOf[Object])
case "long" =>
field
.set(jdbcConfig, x._2.toLong.asInstanceOf[Object])
case "boolean" =>
field.set(jdbcConfig, x._2.toBoolean.asInstanceOf[Object])
case _ =>
}
case None =>
val setMethod =
s"set${x._1.substring(0, 1).toUpperCase}${x._1.substring(1)}"
val method = Try(
jdbcConfig.getClass.getMethods
.filter(_.getName == setMethod)
.filter(_.getParameterCount == 1)
.head).getOrElse(null)
method match {
case m if m != null =>
m.setAccessible(true)
m.getParameterTypes.head.getSimpleName match {
case "String" =>
m.invoke(jdbcConfig, Seq(x._2.asInstanceOf[Object]): _*)
case "int" =>
m.invoke(jdbcConfig, Seq(x._2.toInt.asInstanceOf[Object]): _*)
case "long" =>
m.invoke(jdbcConfig, Seq(x._2.toLong.asInstanceOf[Object]): _*)
case "boolean" =>
m.invoke(jdbcConfig, Seq(x._2.toBoolean.asInstanceOf[Object]): _*)
case _ =>
}
case null =>
throw new IllegalArgumentException(
s"jdbcConfig error,property:${x._1} invalid,please see more properties jdbcConfig https://github.com/brettwooldridge/HikariCP")
}
}
})
val ds = new HikariDataSource(jdbcConfig)
dataSourceHolder += alias -> ds
ds
case Some(x) => x
}
ds.getConnection()
} finally {
lock.unlock()
}
}