private[streampark] def executeSql()

in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala [37:152]


  private[streampark] def executeSql(
      sql: String,
      parameter: ParameterTool,
      context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit = {
    val flinkSql: String =
      if (sql == null || sql.isEmpty) parameter.get(KEY_FLINK_SQL()) else parameter.get(sql)
    require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink sql cannot be empty")

    def callback(r: String): Unit = {
      callbackFunc match {
        case null => logInfo(r)
        case x => x(r)
      }
    }

    var hasInsert = false
    val statementSet = context.createStatementSet()
    SqlCommandParser
      .parseSQL(flinkSql)
      .foreach(
        x => {
          val args = if (x.operands.isEmpty) null else x.operands.head
          val command = x.command.name
          x.command match {
            // For display sql statement result information
            case SHOW_CATALOGS =>
              val catalogs = context.listCatalogs
              callback(s"$command: ${catalogs.mkString("\n")}")
            case SHOW_CURRENT_CATALOG =>
              val catalog = context.getCurrentCatalog
              callback(s"$command: $catalog")
            case SHOW_DATABASES =>
              val databases = context.listDatabases
              callback(s"$command: ${databases.mkString("\n")}")
            case SHOW_CURRENT_DATABASE =>
              val database = context.getCurrentDatabase
              callback(s"$command: $database")
            case SHOW_TABLES =>
              val tables = context.listTables().filter(!_.startsWith("UnnamedTable"))
              callback(s"$command: ${tables.mkString("\n")}")
            case SHOW_FUNCTIONS =>
              val functions = context.listUserDefinedFunctions()
              callback(s"$command: ${functions.mkString("\n")}")
            case SHOW_MODULES =>
              val modules = context.listModules()
              callback(s"$command: ${modules.mkString("\n")}")
            case DESC | DESCRIBE =>
              val schema = context.scan(args).getSchema
              val builder = new mutable.StringBuilder()
              builder.append("Column\tType\n")
              for (i <- 0 to schema.getFieldCount) {
                builder.append(
                  schema.getFieldName(i).get() + "\t" + schema.getFieldDataType(i).get() + "\n")
              }
              callback(builder.toString())
            case EXPLAIN =>
              val tableResult = context.executeSql(x.originSql)
              val r = tableResult.collect().next().getField(0).toString
              callback(r)
            // For specific statement, such as: SET/RESET/INSERT/SELECT
            case SET =>
              val operand = x.operands(1)
              logInfo(s"$command: $args --> $operand")
              context.getConfig.getConfiguration.setString(args, operand)
            case RESET | RESET_ALL =>
              val confDataField = classOf[Configuration].getDeclaredField("confData")
              confDataField.setAccessible(true)
              val confData = confDataField
                .get(context.getConfig.getConfiguration)
                .asInstanceOf[util.HashMap[String, AnyRef]]
              confData.synchronized {
                if (x.command == RESET) {
                  confData.remove(args)
                } else {
                  confData.clear()
                }
              }
              logInfo(s"$command: $args")
            case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
              logWarn(s"SQL Client Syntax: ${x.command.name} ")
            case INSERT =>
              statementSet.addInsertSql(x.originSql)
              hasInsert = true
            case SELECT =>
              logError("StreamPark dose not support 'SELECT' statement now!")
              throw new RuntimeException("StreamPark dose not support 'select' statement now!")
            case _ =>
              try {
                lock.lock()
                val result = context.executeSql(x.originSql)
                logInfo(s"$command:$args")
              } finally {
                if (lock.isHeldByCurrentThread) {
                  lock.unlock()
                }
              }
          }
        })

    if (hasInsert) {
      statementSet.execute() match {
        case t if t != null =>
          Try(t.getJobClient.get.getJobID).getOrElse(null) match {
            case x if x != null => logInfo(s"jobId:$x")
            case _ =>
          }
        case _ =>
      }
    } else {
      logError("No 'INSERT' statement to trigger the execution of the Flink job.")
      throw new RuntimeException("No 'INSERT' statement to trigger the execution of the Flink job.")
    }

    logInfo(
      s"\n\n\n==============flinkSql==============\n\n $flinkSql\n\n============================\n\n\n")
  }