in fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java [303:427]
public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
checkNotNull(fsUri, "file system URI");
return inLock(
LOCK,
() -> {
final URI uri;
if (fsUri.getScheme() != null) {
uri = fsUri;
} else {
// use local FileSystem urI as default
final URI defaultUri = LocalFileSystem.getLocalFsURI();
URI rewrittenUri = null;
try {
rewrittenUri =
new URI(
defaultUri.getScheme(),
null,
defaultUri.getHost(),
defaultUri.getPort(),
fsUri.getPath(),
null,
null);
} catch (URISyntaxException e) {
// for local URIs, we make one more try to repair the path by making it
// absolute
if (defaultUri.getScheme().equals("file")) {
try {
rewrittenUri =
new URI(
"file",
null,
new FsPath(
new File(fsUri.getPath())
.getAbsolutePath())
.toUri()
.getPath(),
null);
} catch (URISyntaxException ignored) {
// could not help it...
}
}
}
if (rewrittenUri != null) {
uri = rewrittenUri;
} else {
throw new IOException(
"The file system URI '"
+ fsUri
+ "' declares no scheme and cannot be interpreted relative to the default file system URI ("
+ defaultUri
+ ").");
}
}
// print a helpful pointer for malformed local URIs (happens a lot to new users)
if (uri.getScheme().equals("file")
&& uri.getAuthority() != null
&& !uri.getAuthority().isEmpty()) {
String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
throw new IOException(
"Found local file path with authority '"
+ uri.getAuthority()
+ "' in path '"
+ uri
+ "'. Hint: Did you forget a slash? (correct path would be '"
+ supposedUri
+ "')");
}
final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());
// See if there is a file system object in the cache
{
FileSystem cached = CACHE.get(key);
if (cached != null) {
return cached;
}
}
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Fluss configuration
if (FS_PLUGINS.isEmpty()) {
initializeWithoutPlugins(new Configuration());
}
// Try to create a new file system
final FileSystem fs;
final FileSystemPlugin fileSystemPlugin = FS_PLUGINS.get(uri.getScheme());
if (fileSystemPlugin != null) {
ClassLoader classLoader = fileSystemPlugin.getClassLoader();
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(classLoader)) {
fs = fileSystemPlugin.create(uri, configuration);
}
} else {
if (DIRECTLY_SUPPORTED_FILESYSTEM.containsKey(uri.getScheme())) {
final Collection<String> plugins =
DIRECTLY_SUPPORTED_FILESYSTEM.get(uri.getScheme());
// todo: may need to add message like flink
// "See
// https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ "
// for more information.",
throw new UnsupportedFileSystemSchemeException(
String.format(
"Could not find a file system implementation for scheme '%s'. File system schemes "
+ "are supported by Fluss through the following plugin(s): %s. "
+ "No file system to support this scheme could be loaded. Please ensure that each plugin is "
+ "configured properly and resides within its own subfolder in the plugins directory. ",
uri.getScheme(), String.join(", ", plugins)));
} else {
throw new UnsupportedFileSystemSchemeException(
"Could not find a file system implementation for scheme '"
+ uri.getScheme()
+ "'. The scheme is not directly supported by Fluss.");
}
}
CACHE.put(key, fs);
return fs;
});
}