in streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala [98:173]
private[this] def formatSql(sql: String): String = {
val LENGTH_REGEXP = "(.*?)\\s*\\([^\\\\)|^\\n]+,$".r
val COMMENT_REGEXP = Pattern.compile("(comment)\\s+(['\"])", Pattern.CASE_INSENSITIVE)
@tailrec def commentJoin(
map: util.Map[Integer, String],
index: Integer,
segment: String): (Integer, String) = {
val matcher = COMMENT_REGEXP.matcher(segment)
matcher.find() match {
case b if !b => index -> segment
case _ =>
val s = matcher.group(2)
val regexp = s"\\$s(,|)$$".r
val cleaned = segment
.replaceFirst("(?i)(comment)\\s+(['\"])", "")
.replace(s"\\$s", "")
regexp.findFirstIn(cleaned) match {
case Some(_) => index -> segment
case _ =>
val nextLine = map(index + 1)
commentJoin(map, index + 1, s"$segment$nextLine")
}
}
}
@tailrec def lengthJoin(
map: util.Map[Integer, String],
index: Integer,
segment: String): (Integer, String) = {
LENGTH_REGEXP.findFirstIn(segment) match {
case None => commentJoin(map, index, segment)
case Some(_) =>
val nextLine = map(index + 1)
lengthJoin(map, index + 1, s"${segment.trim}$nextLine")
}
}
val body = sql
.substring(sql.indexOf("("), sql.lastIndexOf(")") + 1)
.replaceAll("\r|\n|\r\n", "")
.replaceFirst("\\(", "(\n")
.replaceFirst("\\)$", "\n)")
.replaceAll(",", ",\n")
val scanner = new Scanner(body)
val map = new util.HashMap[Integer, String]()
while (scanner.hasNextLine) {
map.put(map.size(), scanner.nextLine().trim)
}
val sqlBuffer = new StringBuffer(sql.substring(0, sql.indexOf("(")))
var skipNo: Int = -1
map.foreach(
a => {
if (a._1 > skipNo) {
val length = lengthJoin(map, a._1, a._2)
if (length._1 > a._1) {
sqlBuffer.append(length._2).append("\n")
skipNo = length._1
} else {
val comment = commentJoin(map, a._1, a._2)
if (comment._1 > a._1) {
sqlBuffer.append(comment._2).append("\n")
skipNo = comment._1
} else {
sqlBuffer.append(a._2).append("\n")
}
}
}
})
scanner.close()
sqlBuffer.toString.trim.concat(sql.substring(sql.lastIndexOf(")") + 1))
}