in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala [39:167]
private[streampark] def executeSql(
sql: String,
parameter: ParameterTool,
context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit = {
val flinkSql: String =
if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL())
else parameter.get(sql)
require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot be empty")
def callback(r: String): Unit = {
callbackFunc match {
case null => logInfo(r)
case x => x(r)
}
}
val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())
var hasInsert = false
val statementSet = context.createStatementSet()
SqlCommandParser
.parseSQL(flinkSql)
.foreach(x => {
val args = if (x.operands.isEmpty) null else x.operands.head
val command = x.command.name
x.command match {
// For display sql statement result information
case SHOW_CATALOGS =>
val catalogs = context.listCatalogs
callback(s"$command: ${catalogs.mkString("\n")}")
case SHOW_CURRENT_CATALOG =>
val catalog = context.getCurrentCatalog
callback(s"$command: $catalog")
case SHOW_DATABASES =>
val databases = context.listDatabases
callback(s"$command: ${databases.mkString("\n")}")
case SHOW_CURRENT_DATABASE =>
val database = context.getCurrentDatabase
callback(s"$command: $database")
case SHOW_TABLES =>
val tables =
context.listTables().filter(!_.startsWith("UnnamedTable"))
callback(s"$command: ${tables.mkString("\n")}")
case SHOW_FUNCTIONS =>
val functions = context.listUserDefinedFunctions()
callback(s"$command: ${functions.mkString("\n")}")
case SHOW_MODULES =>
val modules = context.listModules()
callback(s"$command: ${modules.mkString("\n")}")
case DESC | DESCRIBE =>
val schema = context.scan(args).getSchema
val builder = new mutable.StringBuilder()
builder.append("Column\tType\n")
for (i <- 0 to schema.getFieldCount) {
builder.append(
schema.getFieldName(i).get() + "\t" + schema
.getFieldDataType(i)
.get() + "\n")
}
callback(builder.toString())
case EXPLAIN =>
val tableResult = context.executeSql(x.originSql)
val r = tableResult.collect().next().getField(0).toString
callback(r)
// For specific statement, such as: SET/RESET/INSERT/SELECT
case SET =>
val operand = x.operands(1)
logInfo(s"$command: $args --> $operand")
context.getConfig.getConfiguration.setString(args, operand)
case RESET | RESET_ALL =>
val confDataField =
classOf[Configuration].getDeclaredField("confData")
confDataField.setAccessible(true)
val confData = confDataField
.get(context.getConfig.getConfiguration)
.asInstanceOf[util.HashMap[String, AnyRef]]
confData.synchronized {
if (x.command == RESET) {
confData.remove(args)
} else {
confData.clear()
}
}
logInfo(s"$command: $args")
case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
logWarn(s"SQL Client Syntax: ${x.command.name} ")
case INSERT =>
statementSet.addInsertSql(x.originSql)
hasInsert = true
case SELECT =>
logError("StreamPark dose not support 'SELECT' statement now!")
throw new RuntimeException("StreamPark dose not support 'select' statement now!")
case DELETE | UPDATE =>
AssertUtils.required(
runMode != "STREAMING",
s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
case _ =>
try {
lock.lock()
val result = context.executeSql(x.originSql)
logInfo(s"$command:$args")
} finally {
if (lock.isHeldByCurrentThread) {
lock.unlock()
}
}
}
})
if (hasInsert) {
statementSet.execute() match {
case t if t != null =>
Try(t.getJobClient.get.getJobID).getOrElse(null) match {
case x if x != null => logInfo(s"jobId:$x")
case _ =>
}
case _ =>
}
} else {
logError("No 'INSERT' statement to trigger the execution of the Flink job.")
throw new RuntimeException("No 'INSERT' statement to trigger the execution of the Flink job.")
}
logInfo(
s"\n\n\n==============flinkSql==============\n\n $flinkSql\n\n============================\n\n\n")
}