in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java [877:1109]
private Operation convertCreateTable(HiveParserASTNode ast) throws SemanticException {
ObjectIdentifier tableIdentifier =
HiveParserBaseSemanticAnalyzer.getObjectIdentifier(
catalogRegistry, (HiveParserASTNode) ast.getChild(0));
String[] qualifiedTabName =
new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()};
String dbDotTab = HiveParserBaseSemanticAnalyzer.getDotName(qualifiedTabName);
String likeTableName;
List<FieldSchema> cols = new ArrayList<>();
List<FieldSchema> partCols = new ArrayList<>();
List<PrimaryKey> primaryKeys = new ArrayList<>();
List<NotNullConstraint> notNulls = new ArrayList<>();
String comment = null;
String location = null;
Map<String, String> tblProps = null;
boolean ifNotExists = false;
boolean isExt = false;
boolean isTemporary = false;
HiveParserASTNode selectStmt = null;
final int createTable = 0; // regular CREATE TABLE
final int ctlt = 1; // CREATE TABLE LIKE ... (CTLT)
final int ctas = 2; // CREATE TABLE AS SELECT ... (CTAS)
int commandType = createTable;
HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams =
new HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams();
HiveParserStorageFormat storageFormat = new HiveParserStorageFormat(conf);
LOG.info("Creating table " + dbDotTab + " position=" + ast.getCharPositionInLine());
int numCh = ast.getChildCount();
// Check the 1st-level children and do simple semantic checks: 1) CTLT and CTAS should not
// coexists.
// 2) CTLT or CTAS should not coexists with column list (target table schema).
// 3) CTAS does not support partitioning (for now).
for (int num = 1; num < numCh; num++) {
HiveParserASTNode child = (HiveParserASTNode) ast.getChild(num);
if (storageFormat.fillStorageFormat(child)) {
continue;
}
switch (child.getToken().getType()) {
case HiveASTParser.TOK_IFNOTEXISTS:
ifNotExists = true;
break;
case HiveASTParser.KW_EXTERNAL:
isExt = true;
break;
case HiveASTParser.KW_TEMPORARY:
isTemporary = true;
break;
case HiveASTParser.TOK_LIKETABLE:
if (child.getChildCount() > 0) {
likeTableName =
HiveParserBaseSemanticAnalyzer.getUnescapedName(
(HiveParserASTNode) child.getChild(0));
if (likeTableName != null) {
if (commandType == ctas) {
throw new ValidationException(
ErrorMsg.CTAS_CTLT_COEXISTENCE.getMsg());
}
if (cols.size() != 0) {
throw new ValidationException(
ErrorMsg.CTLT_COLLST_COEXISTENCE.getMsg());
}
}
commandType = ctlt;
handleUnsupportedOperation("CREATE TABLE LIKE is not supported");
}
break;
case HiveASTParser.TOK_QUERY: // CTAS
if (commandType == ctlt) {
throw new ValidationException(ErrorMsg.CTAS_CTLT_COEXISTENCE.getMsg());
}
if (cols.size() != 0) {
throw new ValidationException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg());
}
if (partCols.size() != 0) {
throw new ValidationException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
}
if (isExt) {
throw new ValidationException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg());
}
commandType = ctas;
selectStmt = child;
break;
case HiveASTParser.TOK_TABCOLLIST:
cols =
HiveParserBaseSemanticAnalyzer.getColumns(
child, true, primaryKeys, notNulls);
break;
case HiveASTParser.TOK_TABLECOMMENT:
comment =
HiveParserBaseSemanticAnalyzer.unescapeSQLString(
child.getChild(0).getText());
break;
case HiveASTParser.TOK_TABLEPARTCOLS:
partCols =
HiveParserBaseSemanticAnalyzer.getColumns(
(HiveParserASTNode) child.getChild(0), false);
break;
case HiveASTParser.TOK_TABLEROWFORMAT:
rowFormatParams.analyzeRowFormat(child);
break;
case HiveASTParser.TOK_TABLELOCATION:
location =
HiveParserBaseSemanticAnalyzer.unescapeSQLString(
child.getChild(0).getText());
break;
case HiveASTParser.TOK_TABLEPROPERTIES:
tblProps = getProps((HiveParserASTNode) child.getChild(0));
break;
case HiveASTParser.TOK_TABLESERIALIZER:
child = (HiveParserASTNode) child.getChild(0);
storageFormat.setSerde(
HiveParserBaseSemanticAnalyzer.unescapeSQLString(
child.getChild(0).getText()));
if (child.getChildCount() == 2) {
HiveParserBaseSemanticAnalyzer.readProps(
(HiveParserASTNode) (child.getChild(1).getChild(0)),
storageFormat.getSerdeProps());
}
break;
case HiveASTParser.TOK_ALTERTABLE_BUCKETS:
handleUnsupportedOperation("Bucketed table is not supported");
break;
case HiveASTParser.TOK_TABLESKEWED:
handleUnsupportedOperation("Skewed table is not supported");
break;
default:
throw new ValidationException("Unknown AST node for CREATE TABLE: " + child);
}
}
if (storageFormat.getStorageHandler() != null) {
handleUnsupportedOperation("Storage handler table is not supported");
}
if (commandType == createTable || commandType == ctlt) {
queryState.setCommandType(HiveOperation.CREATETABLE);
} else {
queryState.setCommandType(HiveOperation.CREATETABLE_AS_SELECT);
}
storageFormat.fillDefaultStorageFormat(isExt, false);
if (isTemporary) {
if (partCols.size() > 0) {
handleUnsupportedOperation(
"Partition columns are not supported on temporary tables");
}
handleUnsupportedOperation("Temporary hive table is not supported");
}
// Handle different types of CREATE TABLE command
switch (commandType) {
case createTable: // REGULAR CREATE TABLE DDL
tblProps = addDefaultProperties(tblProps);
return convertCreateTable(
dbDotTab,
isExt,
ifNotExists,
isTemporary,
cols,
partCols,
comment,
location,
tblProps,
rowFormatParams,
storageFormat,
primaryKeys,
notNulls);
case ctlt: // create table like <tbl_name>
tblProps = addDefaultProperties(tblProps);
throw new SemanticException("CREATE TABLE LIKE is not supported yet");
case ctas: // create table as select
tblProps = addDefaultProperties(tblProps);
// analyze the query
HiveParserCalcitePlanner calcitePlanner =
hiveParser.createCalcitePlanner(context, queryState);
calcitePlanner.setCtasCols(cols);
RelNode queryRelNode = calcitePlanner.genLogicalPlan(selectStmt);
ResolvedSchema schema =
HiveTableUtil.createResolvedSchema(
cols, partCols, Collections.emptySet(), null);
ResolvedCatalogTable destTable =
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
comment,
HiveCatalog.getFieldNames(partCols),
tblProps),
schema);
Tuple4<ObjectIdentifier, QueryOperation, Map<String, String>, Boolean>
insertOperationInfo =
dmlHelper.createInsertOperationInfo(
queryRelNode,
destTable,
tableIdentifier,
Collections.emptyMap(),
Collections.emptyList(),
false);
CreateTableOperation createTableOperation =
convertCreateTable(
dbDotTab,
isExt,
ifNotExists,
isTemporary,
cols,
partCols,
comment,
location,
tblProps,
rowFormatParams,
storageFormat,
primaryKeys,
notNulls);
return new CreateTableASOperation(
createTableOperation,
insertOperationInfo.f2,
insertOperationInfo.f1,
insertOperationInfo.f3);
default:
throw new ValidationException("Unrecognized command.");
}
}