in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala [75:150]
def verifySql(sql: String): FlinkSqlValidationResult = {
val sqlCommands = SqlCommandParser.parseSQL(sql, r => return r)
var sqlDialect = SqlDialect.DEFAULT.name().toLowerCase()
var hasInsert = false
for (call <- sqlCommands) {
val args = call.operands.head
val command = call.command
command match {
case SET | RESET =>
if (command == SET && args == TableConfigOptions.TABLE_SQL_DIALECT.key()) {
sqlDialect = call.operands.last
}
case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
logWarn(s"SQL Client Syntax: ${call.command.name} ")
case _ =>
if (command == INSERT) {
hasInsert = true
}
Try {
val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS))
.getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
sqlDialect.toUpperCase() match {
case "HIVE" =>
case "DEFAULT" =>
val parser = calciteClass
.getConstructor(Array(classOf[Config]): _*)
.newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
val method =
parser.getClass.getDeclaredMethod("parse", classOf[String])
method.setAccessible(true)
method.invoke(parser, call.originSql)
case _ =>
throw new UnsupportedOperationException(s"unsupported dialect: $sqlDialect")
}
} match {
case Failure(e) =>
val exception = ExceptionUtils.stringifyException(e)
val causedBy = exception.drop(exception.indexOf("Caused by:"))
val cleanUpError = exception.replaceAll("[\r\n]", "")
if (SYNTAX_ERROR_REGEXP.findAllMatchIn(cleanUpError).nonEmpty) {
val SYNTAX_ERROR_REGEXP(line, column) = cleanUpError
val errorLine = call.lineStart + line.toInt - 1
return FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
lineStart = call.lineStart,
lineEnd = call.lineEnd,
errorLine = errorLine,
errorColumn = column.toInt,
sql = call.originSql,
exception = causedBy.replaceAll(s"at\\sline\\s$line", s"at line $errorLine"))
} else {
return FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
lineStart = call.lineStart,
lineEnd = call.lineEnd,
sql = call.originSql,
exception = causedBy)
}
case _ =>
}
}
}
if (hasInsert) {
FlinkSqlValidationResult()
} else {
FlinkSqlValidationResult(
success = false,
failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
lineStart = sqlCommands.head.lineStart,
lineEnd = sqlCommands.last.lineEnd,
exception = "No 'INSERT' statement to trigger the execution of the Flink job.")
}
}