private[streampark] def executeSql()

in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala [39:167]


  private[streampark] def executeSql(
      sql: String,
      parameter: ParameterTool,
      context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit = {

    val flinkSql: String =
      if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL())
      else parameter.get(sql)
    require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot be empty")

    def callback(r: String): Unit = {
      callbackFunc match {
        case null => logInfo(r)
        case x => x(r)
      }
    }

    val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())

    var hasInsert = false
    val statementSet = context.createStatementSet()
    SqlCommandParser
      .parseSQL(flinkSql)
      .foreach(x => {
        val args = if (x.operands.isEmpty) null else x.operands.head
        val command = x.command.name
        x.command match {
          // For display sql statement result information
          case SHOW_CATALOGS =>
            val catalogs = context.listCatalogs
            callback(s"$command: ${catalogs.mkString("\n")}")
          case SHOW_CURRENT_CATALOG =>
            val catalog = context.getCurrentCatalog
            callback(s"$command: $catalog")
          case SHOW_DATABASES =>
            val databases = context.listDatabases
            callback(s"$command: ${databases.mkString("\n")}")
          case SHOW_CURRENT_DATABASE =>
            val database = context.getCurrentDatabase
            callback(s"$command: $database")
          case SHOW_TABLES =>
            val tables =
              context.listTables().filter(!_.startsWith("UnnamedTable"))
            callback(s"$command: ${tables.mkString("\n")}")
          case SHOW_FUNCTIONS =>
            val functions = context.listUserDefinedFunctions()
            callback(s"$command: ${functions.mkString("\n")}")
          case SHOW_MODULES =>
            val modules = context.listModules()
            callback(s"$command: ${modules.mkString("\n")}")
          case DESC | DESCRIBE =>
            val schema = context.scan(args).getSchema
            val builder = new mutable.StringBuilder()
            builder.append("Column\tType\n")
            for (i <- 0 to schema.getFieldCount) {
              builder.append(
                schema.getFieldName(i).get() + "\t" + schema
                  .getFieldDataType(i)
                  .get() + "\n")
            }
            callback(builder.toString())
          case EXPLAIN =>
            val tableResult = context.executeSql(x.originSql)
            val r = tableResult.collect().next().getField(0).toString
            callback(r)
          // For specific statement, such as: SET/RESET/INSERT/SELECT
          case SET =>
            val operand = x.operands(1)
            logInfo(s"$command: $args --> $operand")
            context.getConfig.getConfiguration.setString(args, operand)
          case RESET | RESET_ALL =>
            val confDataField =
              classOf[Configuration].getDeclaredField("confData")
            confDataField.setAccessible(true)
            val confData = confDataField
              .get(context.getConfig.getConfiguration)
              .asInstanceOf[util.HashMap[String, AnyRef]]
            confData.synchronized {
              if (x.command == RESET) {
                confData.remove(args)
              } else {
                confData.clear()
              }
            }
            logInfo(s"$command: $args")
          case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
            logWarn(s"SQL Client Syntax: ${x.command.name} ")
          case INSERT =>
            statementSet.addInsertSql(x.originSql)
            hasInsert = true
          case SELECT =>
            logError("StreamPark dose not support 'SELECT' statement now!")
            throw new RuntimeException("StreamPark dose not support 'select' statement now!")
          case DELETE | UPDATE =>
            AssertUtils.required(
              runMode != "STREAMING",
              s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
                s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
                s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
          case _ =>
            try {
              lock.lock()
              val result = context.executeSql(x.originSql)
              logInfo(s"$command:$args")
            } finally {
              if (lock.isHeldByCurrentThread) {
                lock.unlock()
              }
            }
        }
      })

    if (hasInsert) {
      statementSet.execute() match {
        case t if t != null =>
          Try(t.getJobClient.get.getJobID).getOrElse(null) match {
            case x if x != null => logInfo(s"jobId:$x")
            case _ =>
          }
        case _ =>
      }
    } else {
      logError("No 'INSERT' statement to trigger the execution of the Flink job.")
      throw new RuntimeException("No 'INSERT' statement to trigger the execution of the Flink job.")
    }

    logInfo(
      s"\n\n\n==============flinkSql==============\n\n $flinkSql\n\n============================\n\n\n")
  }