in storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java [123:188]
protected void init(String fullShufflePath) {
FileSystem fs;
Path baseFolder = new Path(fullShufflePath);
try {
fs = HadoopFilesystemProvider.getFilesystem(baseFolder, hadoopConf);
} catch (Exception ioe) {
throw new RssException("Can't get FileSystem for " + baseFolder);
}
FileStatus[] indexFiles = null;
try {
// get all index files
indexFiles =
fs.listStatus(
baseFolder,
file ->
file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX)
&& (shuffleServerId == null || file.getName().startsWith(shuffleServerId)));
} catch (Exception e) {
if (e instanceof FileNotFoundException) {
LOG.info(
"Directory["
+ baseFolder
+ "] not found. The data may not be flushed to this directory. Nothing will be read.");
} else {
String failedGetIndexFileMsg = "Can't list index file in " + baseFolder;
LOG.error(failedGetIndexFileMsg, e);
}
return;
}
if (indexFiles != null && indexFiles.length != 0) {
for (FileStatus status : indexFiles) {
LOG.info(
"Find index file for shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "] "
+ status.getPath());
String filePrefix = getFileNamePrefix(status.getPath().toUri().toString());
try {
HadoopShuffleReadHandler handler =
new HadoopShuffleReadHandler(
appId,
shuffleId,
partitionId,
filePrefix,
readBufferSize,
expectBlockIds,
processBlockIds,
hadoopConf,
distributionType,
expectTaskIds,
offHeapEnable);
readHandlers.add(handler);
} catch (Exception e) {
LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
}
}
Collections.shuffle(readHandlers);
LOG.info(
"Reading order of Hadoop files with name prefix: {}",
readHandlers.stream().map(x -> x.filePrefix).collect(Collectors.toList()));
}
}