in fluss-filesystems/fluss-fs-hadoop/src/main/java/com/alibaba/fluss/fs/hdfs/HadoopFsPlugin.java [58:169]
public FileSystem create(URI fsUri, Configuration flussConfig) throws IOException {
checkNotNull(fsUri, "fsUri");
final String scheme = fsUri.getScheme();
checkArgument(scheme != null, "file system has null scheme");
// from here on, we need to handle errors due to missing optional
// dependency classes
try {
// -- (1) get the Hadoop config
final org.apache.hadoop.conf.Configuration hadoopConfig =
HadoopUtils.getHadoopConfiguration(flussConfig);
// -- (2) get the Hadoop file system class for that scheme
final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
try {
fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
} catch (IOException e) {
throw new UnsupportedFileSystemSchemeException(
"Hadoop File System abstraction does not support scheme '"
+ scheme
+ "'. "
+ "Either no file system implementation exists for that scheme, "
+ "or the relevant classes are missing from the classpath.",
e);
}
// -- (3) instantiate the Hadoop file system
LOG.debug(
"Instantiating for file system scheme {} Hadoop File System {}",
scheme,
fsClass.getName());
final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
// -- (4) create the proper URI to initialize the file system
final URI initUri;
if (fsUri.getAuthority() != null) {
initUri = fsUri;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)",
fsUri);
}
String configEntry = hadoopConfig.get("fs.defaultFS", null);
if (configEntry == null) {
// fs.default.name deprecated as of hadoop 2.2.0 - see
// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
configEntry = hadoopConfig.get("fs.default.name", null);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
}
if (configEntry == null) {
throw new IOException(
getMissingAuthorityErrorPrefix(fsUri)
+ "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
} else {
try {
initUri = URI.create(configEntry);
} catch (IllegalArgumentException e) {
throw new IOException(
getMissingAuthorityErrorPrefix(fsUri)
+ "The configuration contains an invalid file system default name "
+ "('fs.default.name' or 'fs.defaultFS'): "
+ configEntry);
}
if (initUri.getAuthority() == null) {
throw new IOException(
getMissingAuthorityErrorPrefix(fsUri)
+ "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') "
+ "contains no valid authority component (like hdfs namenode, S3 host, etc)");
}
}
}
// -- (5) configure the Hadoop file system
try {
hadoopFs.initialize(initUri, hadoopConfig);
} catch (UnknownHostException e) {
String message =
"The Hadoop file system's authority ("
+ initUri.getAuthority()
+ "), specified by either the file URI or the configuration, cannot be resolved.";
throw new IOException(message, e);
}
return new HadoopFileSystem(hadoopFs);
} catch (ReflectiveOperationException | LinkageError e) {
throw new UnsupportedFileSystemSchemeException(
"Cannot support file system for '"
+ fsUri.getScheme()
+ "' via Hadoop, because Hadoop is not in the classpath, or some classes "
+ "are missing from the classpath.",
e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
}
}