in paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java [427:586]
static FileIO get(Path path, CatalogContext config) throws IOException {
if (config.options().get(RESOLVING_FILE_IO_ENABLED)) {
FileIO fileIO = new ResolvingFileIO();
fileIO.configure(config);
return fileIO;
}
URI uri = path.toUri();
if (LOG.isDebugEnabled()) {
LOG.debug("Getting FileIO by scheme {}.", uri.getScheme());
}
if (uri.getScheme() == null) {
return new LocalFileIO();
}
// print a helpful pointer for malformed local URIs (happens a lot to new users)
if (uri.getScheme().equals("file")
&& uri.getAuthority() != null
&& !uri.getAuthority().isEmpty()) {
String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
throw new IOException(
"Found local file path with authority '"
+ uri.getAuthority()
+ "' in path '"
+ uri
+ "'. Hint: Did you forget a slash? (correct path would be '"
+ supposedUri
+ "')");
}
FileIOLoader loader = null;
List<IOException> ioExceptionList = new ArrayList<>();
// load preferIO
FileIOLoader preferIOLoader = config.preferIO();
try {
loader = checkAccess(preferIOLoader, path, config);
if (loader != null && LOG.isDebugEnabled()) {
LOG.debug(
"Found preferIOLoader {} with scheme {}.",
loader.getClass().getName(),
loader.getScheme());
}
} catch (IOException ioException) {
ioExceptionList.add(ioException);
}
if (loader == null) {
Map<String, FileIOLoader> loaders = discoverLoaders();
loader = loaders.get(uri.getScheme());
if (!loaders.isEmpty() && LOG.isDebugEnabled()) {
LOG.debug(
"Discovered FileIOLoaders: {}.",
loaders.entrySet().stream()
.map(
e ->
String.format(
"{%s,%s}",
e.getKey(),
e.getValue().getClass().getName()))
.collect(Collectors.joining(",")));
}
}
// load fallbackIO
FileIOLoader fallbackIO = config.fallbackIO();
if (loader != null) {
Set<String> options =
config.options().keySet().stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());
Set<String> missOptions = new HashSet<>();
for (String[] keys : loader.requiredOptions()) {
boolean found = false;
for (String key : keys) {
if (options.contains(key.toLowerCase())) {
found = true;
break;
}
}
if (!found) {
missOptions.add(keys[0]);
}
}
if (missOptions.size() > 0) {
IOException exception =
new IOException(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
String.join("\n", missOptions)));
ioExceptionList.add(exception);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got {} but miss options. Will try to get fallback IO and Hadoop IO respectively.",
loader.getClass().getName());
}
loader = null;
}
}
if (loader == null) {
try {
loader = checkAccess(fallbackIO, path, config);
if (loader != null && LOG.isDebugEnabled()) {
LOG.debug("Got fallback FileIOLoader: {}.", loader.getClass().getName());
}
} catch (IOException ioException) {
ioExceptionList.add(ioException);
}
}
// load hadoopIO
if (loader == null) {
try {
loader = checkAccess(new HadoopFileIOLoader(), path, config);
if (loader != null && LOG.isDebugEnabled()) {
LOG.debug("Got hadoop FileIOLoader: {}.", loader.getClass().getName());
}
} catch (IOException ioException) {
ioExceptionList.add(ioException);
}
}
if (loader == null) {
String fallbackMsg = "";
String preferMsg = "";
if (preferIOLoader != null) {
preferMsg =
" "
+ preferIOLoader.getClass().getSimpleName()
+ " also cannot access this path.";
}
if (fallbackIO != null) {
fallbackMsg =
" "
+ fallbackIO.getClass().getSimpleName()
+ " also cannot access this path.";
}
UnsupportedSchemeException ex =
new UnsupportedSchemeException(
String.format(
"Could not find a file io implementation for scheme '%s' in the classpath."
+ "%s %s Hadoop FileSystem also cannot access this path '%s'.",
uri.getScheme(), preferMsg, fallbackMsg, path));
for (IOException ioException : ioExceptionList) {
ex.addSuppressed(ioException);
}
throw ex;
}
FileIO fileIO = loader.load(path);
fileIO.configure(config);
return fileIO;
}