public static FileSystem getUnguardedFileSystem()

in fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java [303:427]


    public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
        checkNotNull(fsUri, "file system URI");
        return inLock(
                LOCK,
                () -> {
                    final URI uri;

                    if (fsUri.getScheme() != null) {
                        uri = fsUri;
                    } else {
                        // use local FileSystem urI as default
                        final URI defaultUri = LocalFileSystem.getLocalFsURI();
                        URI rewrittenUri = null;

                        try {
                            rewrittenUri =
                                    new URI(
                                            defaultUri.getScheme(),
                                            null,
                                            defaultUri.getHost(),
                                            defaultUri.getPort(),
                                            fsUri.getPath(),
                                            null,
                                            null);
                        } catch (URISyntaxException e) {
                            // for local URIs, we make one more try to repair the path by making it
                            // absolute
                            if (defaultUri.getScheme().equals("file")) {
                                try {
                                    rewrittenUri =
                                            new URI(
                                                    "file",
                                                    null,
                                                    new FsPath(
                                                                    new File(fsUri.getPath())
                                                                            .getAbsolutePath())
                                                            .toUri()
                                                            .getPath(),
                                                    null);
                                } catch (URISyntaxException ignored) {
                                    // could not help it...
                                }
                            }
                        }

                        if (rewrittenUri != null) {
                            uri = rewrittenUri;
                        } else {
                            throw new IOException(
                                    "The file system URI '"
                                            + fsUri
                                            + "' declares no scheme and cannot be interpreted relative to the default file system URI ("
                                            + defaultUri
                                            + ").");
                        }
                    }

                    // print a helpful pointer for malformed local URIs (happens a lot to new users)
                    if (uri.getScheme().equals("file")
                            && uri.getAuthority() != null
                            && !uri.getAuthority().isEmpty()) {
                        String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();

                        throw new IOException(
                                "Found local file path with authority '"
                                        + uri.getAuthority()
                                        + "' in path '"
                                        + uri
                                        + "'. Hint: Did you forget a slash? (correct path would be '"
                                        + supposedUri
                                        + "')");
                    }

                    final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());

                    // See if there is a file system object in the cache
                    {
                        FileSystem cached = CACHE.get(key);
                        if (cached != null) {
                            return cached;
                        }
                    }

                    // this "default" initialization makes sure that the FileSystem class works
                    // even when not configured with an explicit Fluss configuration
                    if (FS_PLUGINS.isEmpty()) {
                        initializeWithoutPlugins(new Configuration());
                    }

                    // Try to create a new file system
                    final FileSystem fs;
                    final FileSystemPlugin fileSystemPlugin = FS_PLUGINS.get(uri.getScheme());

                    if (fileSystemPlugin != null) {
                        ClassLoader classLoader = fileSystemPlugin.getClassLoader();
                        try (TemporaryClassLoaderContext ignored =
                                TemporaryClassLoaderContext.of(classLoader)) {
                            fs = fileSystemPlugin.create(uri, configuration);
                        }
                    } else {
                        if (DIRECTLY_SUPPORTED_FILESYSTEM.containsKey(uri.getScheme())) {
                            final Collection<String> plugins =
                                    DIRECTLY_SUPPORTED_FILESYSTEM.get(uri.getScheme());
                            // todo: may need to add message like flink
                            // "See
                            // https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ "
                            // for more information.",
                            throw new UnsupportedFileSystemSchemeException(
                                    String.format(
                                            "Could not find a file system implementation for scheme '%s'. File system schemes "
                                                    + "are supported by Fluss through the following plugin(s): %s. "
                                                    + "No file system to support this scheme could be loaded. Please ensure that each plugin is "
                                                    + "configured properly and resides within its own subfolder in the plugins directory. ",
                                            uri.getScheme(), String.join(", ", plugins)));
                        } else {
                            throw new UnsupportedFileSystemSchemeException(
                                    "Could not find a file system implementation for scheme '"
                                            + uri.getScheme()
                                            + "'. The scheme is not directly supported by Fluss.");
                        }
                    }
                    CACHE.put(key, fs);
                    return fs;
                });
    }