public FileSystem create()

in fluss-filesystems/fluss-fs-hadoop/src/main/java/com/alibaba/fluss/fs/hdfs/HadoopFsPlugin.java [58:169]


    public FileSystem create(URI fsUri, Configuration flussConfig) throws IOException {
        checkNotNull(fsUri, "fsUri");

        final String scheme = fsUri.getScheme();
        checkArgument(scheme != null, "file system has null scheme");

        // from here on, we need to handle errors due to missing optional
        // dependency classes
        try {
            // -- (1) get the Hadoop config

            final org.apache.hadoop.conf.Configuration hadoopConfig =
                    HadoopUtils.getHadoopConfiguration(flussConfig);

            // -- (2) get the Hadoop file system class for that scheme

            final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
            try {
                fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
            } catch (IOException e) {
                throw new UnsupportedFileSystemSchemeException(
                        "Hadoop File System abstraction does not support scheme '"
                                + scheme
                                + "'. "
                                + "Either no file system implementation exists for that scheme, "
                                + "or the relevant classes are missing from the classpath.",
                        e);
            }

            // -- (3) instantiate the Hadoop file system

            LOG.debug(
                    "Instantiating for file system scheme {} Hadoop File System {}",
                    scheme,
                    fsClass.getName());

            final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();

            // -- (4) create the proper URI to initialize the file system

            final URI initUri;
            if (fsUri.getAuthority() != null) {
                initUri = fsUri;
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)",
                            fsUri);
                }

                String configEntry = hadoopConfig.get("fs.defaultFS", null);
                if (configEntry == null) {
                    // fs.default.name deprecated as of hadoop 2.2.0 - see
                    // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
                    configEntry = hadoopConfig.get("fs.default.name", null);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
                }

                if (configEntry == null) {
                    throw new IOException(
                            getMissingAuthorityErrorPrefix(fsUri)
                                    + "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
                } else {
                    try {
                        initUri = URI.create(configEntry);
                    } catch (IllegalArgumentException e) {
                        throw new IOException(
                                getMissingAuthorityErrorPrefix(fsUri)
                                        + "The configuration contains an invalid file system default name "
                                        + "('fs.default.name' or 'fs.defaultFS'): "
                                        + configEntry);
                    }

                    if (initUri.getAuthority() == null) {
                        throw new IOException(
                                getMissingAuthorityErrorPrefix(fsUri)
                                        + "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') "
                                        + "contains no valid authority component (like hdfs namenode, S3 host, etc)");
                    }
                }
            }

            // -- (5) configure the Hadoop file system

            try {
                hadoopFs.initialize(initUri, hadoopConfig);
            } catch (UnknownHostException e) {
                String message =
                        "The Hadoop file system's authority ("
                                + initUri.getAuthority()
                                + "), specified by either the file URI or the configuration, cannot be resolved.";

                throw new IOException(message, e);
            }

            return new HadoopFileSystem(hadoopFs);
        } catch (ReflectiveOperationException | LinkageError e) {
            throw new UnsupportedFileSystemSchemeException(
                    "Cannot support file system for '"
                            + fsUri.getScheme()
                            + "' via Hadoop, because Hadoop is not in the classpath, or some classes "
                            + "are missing from the classpath.",
                    e);
        } catch (IOException e) {
            throw e;
        } catch (Exception e) {
            throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
        }
    }