public Operation convertToOperation()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java [89:189]


    public Operation convertToOperation(HiveParserASTNode ast) throws SemanticException {
        boolean isLocal = false;
        boolean isOverWrite = false;
        Tree fromTree = ast.getChild(0);
        HiveParserASTNode tableTree = (HiveParserASTNode) ast.getChild(1);

        if (ast.getChildCount() == 4) {
            isLocal = true;
            isOverWrite = true;
        }

        if (ast.getChildCount() == 3) {
            if (ast.getChild(2).getText().equalsIgnoreCase("local")) {
                isLocal = true;
            } else {
                isOverWrite = true;
            }
        }

        // initialize load path
        URI fromURI;
        try {
            String fromPath = stripQuotes(fromTree.getText());
            fromURI = initializeFromURI(fromPath, isLocal);
        } catch (IOException | URISyntaxException e) {
            throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e.getMessage()), e);
        }

        // initialize destination table/partition
        TableSpec ts = new TableSpec(catalogRegistry, conf, tableTree, frameworkConfig, cluster);
        if (!HiveCatalog.isHiveTable(ts.table.getOptions())) {
            throw new UnsupportedOperationException(
                    "Load data into non-hive table is not supported yet.");
        }
        if (!ts.tableIdentifier.getCatalogName().equals(catalogRegistry.getCurrentCatalog())) {
            throw new UnsupportedOperationException(
                    String.format(
                            "Load data into a table which isn't in current catalog is not supported yet."
                                    + " The table's catalog is %s, but the current catalog is %s.",
                            ts.tableIdentifier.getCatalogName(),
                            catalogRegistry.getCurrentCatalog()));
        }
        Table table;
        try {
            table =
                    db.getTable(
                            ts.tableIdentifier.getDatabaseName(),
                            ts.tableIdentifier.getObjectName());
        } catch (HiveException e) {
            throw new FlinkHiveException(
                    String.format("Fail to get table %s.", ts.tableIdentifier.asSummaryString()),
                    e);
        }

        if (table.isView() || table.isMaterializedView()) {
            throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
        }
        if (table.isNonNative()) {
            throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
        }

        if (table.isStoredAsSubDirectories()) {
            throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
        }

        List<FieldSchema> parts = table.getPartitionKeys();
        if ((parts != null && parts.size() > 0)
                && (ts.partSpec == null || ts.partSpec.size() == 0)) {
            throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
        }

        List<String> bucketCols = table.getBucketCols();
        if (bucketCols != null && !bucketCols.isEmpty()) {
            String error = HiveConf.StrictChecks.checkBucketing(conf);
            if (error != null) {
                throw new SemanticException(
                        "Please load into an intermediate table"
                                + " and use 'insert... select' to allow Hive to enforce bucketing. "
                                + error);
            }
        }

        // make sure the arguments make sense, may need to write "LOAD DATA" to "INSERT AS SELECT"
        // when there's any directory in the fromURL
        List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);

        // for managed tables, make sure the file formats match
        if (TableType.MANAGED_TABLE.equals(table.getTableType())
                && conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
            ensureFileFormatsMatch(ts, table, files, fromURI);
        }

        HiveLoadDataOperation hiveLoadDataOperation =
                new HiveLoadDataOperation(
                        new Path(fromURI),
                        new ObjectPath(table.getDbName(), table.getTableName()),
                        isOverWrite,
                        isLocal,
                        ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec);
        return new HiveExecutableOperation(hiveLoadDataOperation);
    }