in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala [120:206]
private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
try {
sqlNode.accept(new PreValidateReWriter(validator, typeFactory))
// do extended validation.
sqlNode match {
case node: ExtendedSqlNode =>
node.validate()
case _ =>
}
// no need to validate row type for DDL and insert nodes.
if (
sqlNode.getKind.belongsTo(SqlKind.DDL)
|| sqlNode.getKind == SqlKind.CREATE_FUNCTION
|| sqlNode.getKind == SqlKind.DROP_FUNCTION
|| sqlNode.getKind == SqlKind.OTHER_DDL
|| sqlNode.isInstanceOf[SqlLoadModule]
|| sqlNode.isInstanceOf[SqlShowCatalogs]
|| sqlNode.isInstanceOf[SqlShowCurrentCatalog]
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowCurrentDatabase]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowModels]
|| sqlNode.isInstanceOf[SqlShowFunctions]
|| sqlNode.isInstanceOf[SqlShowJars]
|| sqlNode.isInstanceOf[SqlShowModules]
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlShowColumns]
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlShowProcedures]
|| sqlNode.isInstanceOf[SqlShowJobs]
|| sqlNode.isInstanceOf[SqlDescribeJob]
|| sqlNode.isInstanceOf[SqlRichDescribeFunction]
|| sqlNode.isInstanceOf[SqlRichDescribeModel]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]
|| sqlNode.isInstanceOf[SqlBeginStatementSet]
|| sqlNode.isInstanceOf[SqlEndStatementSet]
|| sqlNode.isInstanceOf[SqlSet]
|| sqlNode.isInstanceOf[SqlReset]
|| sqlNode.isInstanceOf[SqlExecutePlan]
|| sqlNode.isInstanceOf[SqlTruncateTable]
) {
return sqlNode
}
sqlNode match {
case richExplain: SqlRichExplain =>
val validatedStatement = richExplain.getStatement match {
// only validate source here
case insert: RichSqlInsert =>
validateRichSqlInsert(insert)
case others =>
validate(others)
}
richExplain.setOperand(0, validatedStatement)
richExplain
case statementSet: SqlStatementSet =>
statementSet.getInserts.asScala.zipWithIndex.foreach {
case (insert, idx) => statementSet.setOperand(idx, validate(insert))
}
statementSet
case execute: SqlExecute =>
execute.setOperand(0, validate(execute.getStatement))
execute
case insert: RichSqlInsert =>
validateRichSqlInsert(insert)
case compile: SqlCompilePlan =>
compile.setOperand(0, validate(compile.getOperandList.get(0)))
compile
case compileAndExecute: SqlCompileAndExecutePlan =>
compileAndExecute.setOperand(0, validate(compileAndExecute.getOperandList.get(0)))
compileAndExecute
// for call procedure statement
case sqlCallNode if sqlCallNode.getKind == SqlKind.PROCEDURE_CALL =>
val callNode = sqlCallNode.asInstanceOf[SqlBasicCall]
callNode.getOperandList.asScala.zipWithIndex.foreach {
case (operand, idx) => callNode.setOperand(idx, validate(operand))
}
callNode
case _ =>
validator.validate(sqlNode)
}
} catch {
case e: RuntimeException =>
throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
}
}