in streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala [188:240]
private[this] def convertSql(
sql: String,
typeFunc: (String, String) => String = null,
keyFunc: String => String = null,
postfix: String = null): String = {
val formattedSql = formatSql(sql)
val scanner = new Scanner(formattedSql)
val sqlBuffer = new StringBuffer()
while (scanner.hasNextLine) {
val line = {
val line = scanner.nextLine().trim
line.toUpperCase().trim match {
case a if a.startsWith("CREATE ") => line
case a if a.startsWith("PRIMARY KEY ") =>
keyFunc match {
case null => null
case _ => keyFunc(line)
}
case a if !a.startsWith("UNIQUE KEY ") && !a.startsWith("KEY ") =>
val matcher = FIELD_REGEXP.matcher(line)
if (!matcher.find()) null;
else {
val fieldName = matcher.group(1)
val dataType = (matcher.group(2), matcher.group(3)) match {
case (_, b) if b != null => b
case (a, _) => a
}
val length = matcher.group(4)
if (dataType == null) null;
else {
val fieldType = typeFunc(dataType, length)
matcher.group(8) match {
case x if x != null => s"$fieldName $fieldType COMMENT '${matcher.group(10)}'"
case _ => s"$fieldName $fieldType"
}
}
}
case _ => null
}
}
if (line != null) {
sqlBuffer
.append(line)
.append(if (line.toUpperCase.trim.startsWith("CREATE ")) "\n" else ",\n")
}
}
scanner.close()
sqlBuffer.toString.trim.replaceAll(",$", "\n)") match {
case x if postfix != null => s"$x $postfix"
case x => x
}
}