in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java [89:189]
public Operation convertToOperation(HiveParserASTNode ast) throws SemanticException {
boolean isLocal = false;
boolean isOverWrite = false;
Tree fromTree = ast.getChild(0);
HiveParserASTNode tableTree = (HiveParserASTNode) ast.getChild(1);
if (ast.getChildCount() == 4) {
isLocal = true;
isOverWrite = true;
}
if (ast.getChildCount() == 3) {
if (ast.getChild(2).getText().equalsIgnoreCase("local")) {
isLocal = true;
} else {
isOverWrite = true;
}
}
// initialize load path
URI fromURI;
try {
String fromPath = stripQuotes(fromTree.getText());
fromURI = initializeFromURI(fromPath, isLocal);
} catch (IOException | URISyntaxException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e.getMessage()), e);
}
// initialize destination table/partition
TableSpec ts = new TableSpec(catalogRegistry, conf, tableTree, frameworkConfig, cluster);
if (!HiveCatalog.isHiveTable(ts.table.getOptions())) {
throw new UnsupportedOperationException(
"Load data into non-hive table is not supported yet.");
}
if (!ts.tableIdentifier.getCatalogName().equals(catalogRegistry.getCurrentCatalog())) {
throw new UnsupportedOperationException(
String.format(
"Load data into a table which isn't in current catalog is not supported yet."
+ " The table's catalog is %s, but the current catalog is %s.",
ts.tableIdentifier.getCatalogName(),
catalogRegistry.getCurrentCatalog()));
}
Table table;
try {
table =
db.getTable(
ts.tableIdentifier.getDatabaseName(),
ts.tableIdentifier.getObjectName());
} catch (HiveException e) {
throw new FlinkHiveException(
String.format("Fail to get table %s.", ts.tableIdentifier.asSummaryString()),
e);
}
if (table.isView() || table.isMaterializedView()) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
if (table.isNonNative()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
}
if (table.isStoredAsSubDirectories()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
}
List<FieldSchema> parts = table.getPartitionKeys();
if ((parts != null && parts.size() > 0)
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
List<String> bucketCols = table.getBucketCols();
if (bucketCols != null && !bucketCols.isEmpty()) {
String error = HiveConf.StrictChecks.checkBucketing(conf);
if (error != null) {
throw new SemanticException(
"Please load into an intermediate table"
+ " and use 'insert... select' to allow Hive to enforce bucketing. "
+ error);
}
}
// make sure the arguments make sense, may need to write "LOAD DATA" to "INSERT AS SELECT"
// when there's any directory in the fromURL
List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);
// for managed tables, make sure the file formats match
if (TableType.MANAGED_TABLE.equals(table.getTableType())
&& conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
ensureFileFormatsMatch(ts, table, files, fromURI);
}
HiveLoadDataOperation hiveLoadDataOperation =
new HiveLoadDataOperation(
new Path(fromURI),
new ObjectPath(table.getDbName(), table.getTableName()),
isOverWrite,
isLocal,
ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec);
return new HiveExecutableOperation(hiveLoadDataOperation);
}