in streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/SqlCommandParser.scala [411:574]
def splitSql(sql: String): List[SqlSegment] = {
val queries = ListBuffer[String]()
val lastIndex = if (StringUtils.isNotBlank(sql)) sql.length - 1 else 0
var query = new mutable.StringBuilder
var multiLineComment = false
var singleLineComment = false
var singleQuoteString = false
var doubleQuoteString = false
var lineNum: Int = 0
val lineNumMap = new collection.mutable.HashMap[Int, (Int, Int)]()
// Whether each line of the record is empty. If it is empty, it is false. If it is not empty, it is true
val lineDescriptor = {
val scanner = new Scanner(sql)
val descriptor = new collection.mutable.HashMap[Int, Boolean]
var lineNumber = 0
var startComment = false
var hasComment = false
while (scanner.hasNextLine) {
lineNumber += 1
val line = scanner.nextLine().trim
val nonEmpty =
StringUtils.isNotBlank(line) && !line.startsWith(PARAM_PREFIX)
if (line.startsWith("/*")) {
startComment = true
hasComment = true
}
descriptor += lineNumber -> (nonEmpty && !hasComment)
if (startComment && line.endsWith("*/")) {
startComment = false
hasComment = false
}
}
descriptor
}
@tailrec
def findStartLine(num: Int): Int =
if (num >= lineDescriptor.size || lineDescriptor(num)) num
else findStartLine(num + 1)
def markLineNumber(): Unit = {
val line = lineNum + 1
if (lineNumMap.isEmpty) {
lineNumMap += (0 -> (findStartLine(1) -> line))
} else {
val index = lineNumMap.size
val start = lineNumMap(lineNumMap.size - 1)._2 + 1
lineNumMap += (index -> (findStartLine(start) -> line))
}
}
for (idx <- 0 until sql.length) {
if (sql.charAt(idx) == '\n') lineNum += 1
breakable {
val ch = sql.charAt(idx)
// end of single line comment
if (singleLineComment && (ch == '\n')) {
singleLineComment = false
query += ch
if (idx == lastIndex && query.toString.trim.nonEmpty) {
// add query when it is the end of sql.
queries += query.toString
}
break()
}
// end of multiple line comment
if (multiLineComment && (idx - 1) >= 0 && sql.charAt(idx - 1) == '/'
&& (idx - 2) >= 0 && sql.charAt(idx - 2) == '*') {
multiLineComment = false
}
// single quote start or end mark
if (ch == '\'' && !(singleLineComment || multiLineComment)) {
if (singleQuoteString) {
singleQuoteString = false
} else if (!doubleQuoteString) {
singleQuoteString = true
}
}
// double quote start or end mark
if (ch == '"' && !(singleLineComment || multiLineComment)) {
if (doubleQuoteString && idx > 0) {
doubleQuoteString = false
} else if (!singleQuoteString) {
doubleQuoteString = true
}
}
// single line comment or multiple line comment start mark
if (!singleQuoteString && !doubleQuoteString && !multiLineComment && !singleLineComment && idx < lastIndex) {
if (isSingleLineComment(sql.charAt(idx), sql.charAt(idx + 1))) {
singleLineComment = true
} else if (sql.charAt(idx) == '/' && sql.length > (idx + 2)
&& sql.charAt(idx + 1) == '*' && sql.charAt(idx + 2) != '+') {
multiLineComment = true
}
}
if (ch == ';' && !singleQuoteString && !doubleQuoteString && !multiLineComment && !singleLineComment) {
markLineNumber()
// meet the end of semicolon
if (query.toString.trim.nonEmpty) {
queries += query.toString
query = new mutable.StringBuilder
}
} else if (idx == lastIndex) {
markLineNumber()
// meet the last character
if (!singleLineComment && !multiLineComment) {
query += ch
}
if (query.toString.trim.nonEmpty) {
queries += query.toString
query = new mutable.StringBuilder
}
} else if (!singleLineComment && !multiLineComment) {
// normal case, not in single line comment and not in multiple line comment
query += ch
} else if (ch == '\n') {
query += ch
}
}
}
val refinedQueries = new collection.mutable.HashMap[Int, String]()
for (i <- queries.indices) {
val currStatement = queries(i)
if (isSingleLineComment(currStatement) || isMultipleLineComment(currStatement)) {
// transform comment line as blank lines
if (refinedQueries.nonEmpty) {
val lastRefinedQuery = refinedQueries.last
refinedQueries(refinedQueries.size - 1) =
lastRefinedQuery + extractLineBreaks(currStatement)
}
} else {
var linesPlaceholder = ""
if (i > 0) {
linesPlaceholder = extractLineBreaks(refinedQueries(i - 1))
}
// add some blank lines before the statement to keep the original line number
val refinedQuery = linesPlaceholder + currStatement
refinedQueries += refinedQueries.size -> refinedQuery
}
}
val set = new ListBuffer[SqlSegment]
refinedQueries.foreach(x => {
val line = lineNumMap(x._1)
set += SqlSegment(line._1, line._2, x._2)
})
set.toList.sortWith((a, b) => a.start < b.start)
}