in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala [36:91]
def parseSQL(
sql: String,
validationCallback: FlinkSqlValidationResult => Unit = null): List[SqlCommandCall] = {
val sqlEmptyError = "verify failed: flink sql cannot be empty."
require(sql != null && sql.trim.nonEmpty, sqlEmptyError)
val sqlSegments = SqlSplitter.splitSql(sql)
sqlSegments match {
case s if s.isEmpty =>
if (validationCallback != null) {
validationCallback(
FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
exception = sqlEmptyError))
null
} else {
throw new IllegalArgumentException(sqlEmptyError)
}
case segments =>
val calls = new ListBuffer[SqlCommandCall]
for (segment <- segments) {
parseLine(segment) match {
case Some(x) => calls += x
case _ =>
if (validationCallback != null) {
validationCallback(
FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.UNSUPPORTED_SQL,
lineStart = segment.start,
lineEnd = segment.end,
exception = s"unsupported sql",
sql = segment.sql
))
} else {
throw new UnsupportedOperationException(s"unsupported sql: ${segment.sql}")
}
}
}
calls.toList match {
case c if c.isEmpty =>
if (validationCallback != null) {
validationCallback(
FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
exception = "flink sql syntax error, no executable sql"))
null
} else {
throw new UnsupportedOperationException("flink sql syntax error, no executable sql")
}
case r => r
}
}
}