private[this] def convertSql()

in streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala [188:240]


  private[this] def convertSql(
      sql: String,
      typeFunc: (String, String) => String = null,
      keyFunc: String => String = null,
      postfix: String = null): String = {

    val formattedSql = formatSql(sql)
    val scanner = new Scanner(formattedSql)
    val sqlBuffer = new StringBuffer()
    while (scanner.hasNextLine) {
      val line = {
        val line = scanner.nextLine().trim
        line.toUpperCase().trim match {
          case a if a.startsWith("CREATE ") => line
          case a if a.startsWith("PRIMARY KEY ") =>
            keyFunc match {
              case null => null
              case _ => keyFunc(line)
            }
          case a if !a.startsWith("UNIQUE KEY ") && !a.startsWith("KEY ") =>
            val matcher = FIELD_REGEXP.matcher(line)
            if (!matcher.find()) null;
            else {
              val fieldName = matcher.group(1)
              val dataType = (matcher.group(2), matcher.group(3)) match {
                case (_, b) if b != null => b
                case (a, _) => a
              }
              val length = matcher.group(4)
              if (dataType == null) null;
              else {
                val fieldType = typeFunc(dataType, length)
                matcher.group(8) match {
                  case x if x != null => s"$fieldName $fieldType COMMENT '${matcher.group(10)}'"
                  case _ => s"$fieldName $fieldType"
                }
              }
            }
          case _ => null
        }
      }
      if (line != null) {
        sqlBuffer
          .append(line)
          .append(if (line.toUpperCase.trim.startsWith("CREATE ")) "\n" else ",\n")
      }
    }
    scanner.close()
    sqlBuffer.toString.trim.replaceAll(",$", "\n)") match {
      case x if postfix != null => s"$x $postfix"
      case x => x
    }
  }