in worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java [162:233]
public FileInfo getSortedFileInfo(
String shuffleKey, String fileName, FileInfo fileInfo, int startMapIndex, int endMapIndex)
throws IOException {
String fileId = shuffleKey + "-" + fileName;
UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
Set<String> sorted =
sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
Set<String> sorting =
sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
synchronized (sorting) {
if (sorted.contains(fileId)) {
return resolve(
shuffleKey,
fileId,
userIdentifier,
sortedFilePath,
indexFilePath,
startMapIndex,
endMapIndex);
}
if (!sorting.contains(fileId)) {
try {
FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
sorting.add(fileId);
shuffleSortTaskDeque.put(fileSorter);
} catch (InterruptedException e) {
logger.info("Sorter scheduler thread is interrupted means worker is shutting down.");
throw new IOException(
"Sort scheduler thread is interrupted means worker is shutting down.", e);
} catch (IOException e) {
logger.error("File sorter access HDFS failed.", e);
throw new IOException("File sorter access HDFS failed.", e);
}
}
}
long sortStartTime = System.currentTimeMillis();
while (!sorted.contains(fileId)) {
if (sorting.contains(fileId)) {
try {
Thread.sleep(50);
if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
logger.error("Sorting file {} timeout after {}ms", fileId, sortTimeout);
throw new IOException(
"Sort file " + fileInfo.getFilePath() + " timeout after " + sortTimeout);
}
} catch (InterruptedException e) {
logger.error("Sorter scheduler thread is interrupted means worker is shutting down.", e);
throw new IOException(
"Sorter scheduler thread is interrupted means worker is shutting down.", e);
}
} else {
logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey, fileInfo.getFilePath());
throw new IOException(
"Sorting shuffle file for " + shuffleKey + " " + fileInfo.getFilePath() + " failed.");
}
}
return resolve(
shuffleKey,
fileId,
userIdentifier,
sortedFilePath,
indexFilePath,
startMapIndex,
endMapIndex);
}