def getConnection()

in streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala [204:269]


  def getConnection(prop: Properties): Connection = {
    val alias = prop(KEY_ALIAS)
    val lock = lockMap.getOrElseUpdate(alias, new ReentrantLock())
    try {
      lock.lock()
      val ds: HikariDataSource =
        Try(Option(dataSourceHolder(alias))).getOrElse(None) match {
          case None =>
            val jdbcConfig = new HikariConfig()
            prop
              .filter(x => x._1 != KEY_ALIAS && x._1 != KEY_SEMANTIC)
              .foreach(x => {
                Try(Option(jdbcConfig.getClass.getDeclaredField(x._1)))
                  .getOrElse(None) match {
                  case Some(field) =>
                    field.setAccessible(true)
                    field.getType.getSimpleName match {
                      case "String" =>
                        field.set(jdbcConfig, x._2.asInstanceOf[Object])
                      case "int" =>
                        field.set(jdbcConfig, x._2.toInt.asInstanceOf[Object])
                      case "long" =>
                        field
                          .set(jdbcConfig, x._2.toLong.asInstanceOf[Object])
                      case "boolean" =>
                        field.set(jdbcConfig, x._2.toBoolean.asInstanceOf[Object])
                      case _ =>
                    }
                  case None =>
                    val setMethod =
                      s"set${x._1.substring(0, 1).toUpperCase}${x._1.substring(1)}"
                    val method = Try(
                      jdbcConfig.getClass.getMethods
                        .filter(_.getName == setMethod)
                        .filter(_.getParameterCount == 1)
                        .head).getOrElse(null)
                    method match {
                      case m if m != null =>
                        m.setAccessible(true)
                        m.getParameterTypes.head.getSimpleName match {
                          case "String" =>
                            m.invoke(jdbcConfig, Seq(x._2.asInstanceOf[Object]): _*)
                          case "int" =>
                            m.invoke(jdbcConfig, Seq(x._2.toInt.asInstanceOf[Object]): _*)
                          case "long" =>
                            m.invoke(jdbcConfig, Seq(x._2.toLong.asInstanceOf[Object]): _*)
                          case "boolean" =>
                            m.invoke(jdbcConfig, Seq(x._2.toBoolean.asInstanceOf[Object]): _*)
                          case _ =>
                        }
                      case null =>
                        throw new IllegalArgumentException(
                          s"jdbcConfig error,property:${x._1} invalid,please see more properties jdbcConfig https://github.com/brettwooldridge/HikariCP")
                    }
                }
              })
            val ds = new HikariDataSource(jdbcConfig)
            dataSourceHolder += alias -> ds
            ds
          case Some(x) => x
        }
      ds.getConnection()
    } finally {
      lock.unlock()
    }
  }