def verifySql()

in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala [72:149]


  def verifySql(sql: String): FlinkSqlValidationResult = {
    val sqlCommands = SqlCommandParser.parseSQL(sql, r => return r)
    var sqlDialect = "default"
    var hasInsert = false
    for (call <- sqlCommands) {
      val args = call.operands.head
      lazy val command = call.command
      command match {
        case SET | RESET =>
          if (command == SET && args == TableConfigOptions.TABLE_SQL_DIALECT.key()) {
            sqlDialect = call.operands.last
          }
        case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
          logWarn(s"SQL Client Syntax: ${call.command.name} ")
        case _ =>
          if (command == INSERT) {
            hasInsert = true
          }
          Try {
            val calciteClass = Try(Class.forName(FLINK112_CALCITE_PARSER_CLASS))
              .getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
            sqlDialect.toUpperCase() match {
              case "HIVE" =>
              case "DEFAULT" =>
                val parser = calciteClass
                  .getConstructor(Array(classOf[Config]): _*)
                  .newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
                val method = parser.getClass.getDeclaredMethod("parse", classOf[String])
                method.setAccessible(true)
                method.invoke(parser, call.originSql)
              case _ =>
                throw new UnsupportedOperationException(s"unsupported dialect: $sqlDialect")
            }
          } match {
            case Failure(e) =>
              val exception = Utils.stringifyException(e)
              val causedBy = exception.drop(exception.indexOf("Caused by:"))
              val cleanUpError = exception.replaceAll("[\r\n]", "")
              if (SYNTAX_ERROR_REGEXP.findAllMatchIn(cleanUpError).nonEmpty) {
                val SYNTAX_ERROR_REGEXP(line, column) = cleanUpError
                val errorLine = call.lineStart + line.toInt - 1
                return FlinkSqlValidationResult(
                  success = false,
                  failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
                  lineStart = call.lineStart,
                  lineEnd = call.lineEnd,
                  errorLine = errorLine,
                  errorColumn = column.toInt,
                  sql = call.originSql,
                  exception = causedBy.replaceAll(s"at\\sline\\s$line", s"at line $errorLine")
                )
              } else {
                return FlinkSqlValidationResult(
                  success = false,
                  failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
                  lineStart = call.lineStart,
                  lineEnd = call.lineEnd,
                  sql = call.originSql,
                  exception = causedBy
                )
              }
            case _ =>
          }
      }
    }

    if (hasInsert) {
      FlinkSqlValidationResult()
    } else {
      FlinkSqlValidationResult(
        success = false,
        failedType = FlinkSqlValidationFailedType.SYNTAX_ERROR,
        lineStart = sqlCommands.head.lineStart,
        lineEnd = sqlCommands.last.lineEnd,
        exception = "No 'INSERT' statement to trigger the execution of the Flink job."
      )
    }
  }