in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java [1285:1392]
private void handleInsertStatementSpecPhase1(
HiveParserASTNode ast,
HiveParserQBParseInfo qbp,
HiveParserBaseSemanticAnalyzer.Phase1Ctx ctx1)
throws SemanticException {
HiveParserASTNode tabColName = (HiveParserASTNode) ast.getChild(1);
if (ast.getType() == HiveASTParser.TOK_INSERT_INTO
&& tabColName != null
&& tabColName.getType() == HiveASTParser.TOK_TABCOLNAME) {
// we have "insert into foo(a,b)..."; parser will enforce that 1+ columns are listed if
// TOK_TABCOLNAME is present
List<String> targetColNames = new ArrayList<>();
for (Node col : tabColName.getChildren()) {
assert ((HiveParserASTNode) col).getType() == HiveASTParser.Identifier
: "expected token "
+ HiveASTParser.Identifier
+ " found "
+ ((HiveParserASTNode) col).getType();
targetColNames.add(((HiveParserASTNode) col).getText());
}
String fullTableName =
getUnescapedName(
(HiveParserASTNode) ast.getChild(0).getChild(0),
catalogRegistry.getCurrentCatalog(),
catalogRegistry.getCurrentDatabase());
qbp.setDestSchemaForClause(ctx1.dest, targetColNames);
Set<String> targetColumns = new HashSet<>(targetColNames);
if (targetColNames.size() != targetColumns.size()) {
throw new SemanticException(
HiveParserUtils.generateErrorMessage(
tabColName,
"Duplicate column name detected in "
+ fullTableName
+ " table schema specification"));
}
CatalogTable targetTable = getCatalogTable(fullTableName, qb);
Set<String> partitionColumns = new HashSet<>(targetTable.getPartitionKeys());
ResolvedSchema resolvedSchema =
((ResolvedCatalogTable) targetTable).getResolvedSchema();
for (String column : resolvedSchema.getColumnNames()) {
// parser only allows foo(a,b), not foo(foo.a, foo.b)
// only consider non-partition col
if (!partitionColumns.contains(column)) {
targetColumns.remove(column);
}
}
// here we need to see if remaining columns are dynamic partition columns
if (!targetColumns.isEmpty()) {
/* We just checked the user specified schema columns among regular table column and found some which are not
'regular'. Now check is they are dynamic partition columns
For dynamic partitioning,
Given "create table multipart(a int, b int) partitioned by (c int, d int);"
for "insert into multipart partition(c='1',d)(d,a) values(2,3);" we expect parse tree to look like this
(TOK_INSERT_INTO
(TOK_TAB
(TOK_TABNAME multipart)
(TOK_PARTSPEC
(TOK_PARTVAL c '1')
(TOK_PARTVAL d)
)
)
(TOK_TABCOLNAME d a)
)*/
List<String> dynamicPartitionColumns = new ArrayList<String>();
if (ast.getChild(0) != null && ast.getChild(0).getType() == HiveASTParser.TOK_TAB) {
HiveParserASTNode tokTab = (HiveParserASTNode) ast.getChild(0);
HiveParserASTNode tokPartSpec =
(HiveParserASTNode)
tokTab.getFirstChildWithType(HiveASTParser.TOK_PARTSPEC);
if (tokPartSpec != null) {
for (Node n : tokPartSpec.getChildren()) {
HiveParserASTNode tokPartVal = null;
if (n instanceof HiveParserASTNode) {
tokPartVal = (HiveParserASTNode) n;
}
if (tokPartVal != null
&& tokPartVal.getType() == HiveASTParser.TOK_PARTVAL
&& tokPartVal.getChildCount() == 1) {
assert tokPartVal.getChild(0).getType() == HiveASTParser.Identifier
: "Expected column name; found tokType="
+ tokPartVal.getType();
dynamicPartitionColumns.add(tokPartVal.getChild(0).getText());
}
}
}
}
for (String colName : dynamicPartitionColumns) {
targetColumns.remove(colName);
}
if (!targetColumns.isEmpty()) {
// Found some columns in user specified schema which are neither regular not
// dynamic partition columns
throw new SemanticException(
HiveParserUtils.generateErrorMessage(
tabColName,
"'"
+ (targetColumns.size() == 1
? targetColumns.iterator().next()
: targetColumns)
+ "' in insert schema specification "
+ (targetColumns.size() == 1 ? "is" : "are")
+ " not found among regular columns of "
+ fullTableName
+ " nor dynamic partition columns."));
}
}
}
}