in worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java [203:304]
public FileInfo getSortedFileInfo(
String shuffleKey, String fileName, FileInfo fileInfo, int startMapIndex, int endMapIndex)
throws IOException {
if (fileInfo instanceof MemoryFileInfo) {
MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
Map<Integer, List<ShuffleBlockInfo>> indexesMap;
sortMemoryShuffleFile(memoryFileInfo);
indexesMap = memoryFileInfo.getSortedIndexes();
ReduceFileMeta reduceFileMeta =
new ReduceFileMeta(
ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, true),
shuffleChunkSize);
CompositeByteBuf targetBuffer =
MemoryManager.instance().getStorageByteBufAllocator().compositeBuffer(Integer.MAX_VALUE);
ShuffleBlockInfoUtils.sliceSortedBufferByMapRange(
startMapIndex,
endMapIndex,
indexesMap,
memoryFileInfo.getSortedBuffer(),
targetBuffer,
shuffleChunkSize);
return new MemoryFileInfo(
memoryFileInfo.getUserIdentifier(),
memoryFileInfo.isPartitionSplitEnabled(),
reduceFileMeta,
targetBuffer);
} else {
DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
String fileId = shuffleKey + "-" + fileName;
UserIdentifier userIdentifier = diskFileInfo.getUserIdentifier();
Set<String> sorted =
sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
Set<String> sorting =
sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> ConcurrentHashMap.newKeySet());
String sortedFilePath = Utils.getSortedFilePath(diskFileInfo.getFilePath());
String indexFilePath = Utils.getIndexFilePath(diskFileInfo.getFilePath());
boolean fileSorting = true;
synchronized (sorting) {
if (sorted.contains(fileId)) {
fileSorting = false;
} else if (!sorting.contains(fileId)) {
try {
FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, shuffleKey);
sorting.add(fileId);
logger.debug(
"Adding sorter to sort queue shuffle key {}, file name {}", shuffleKey, fileName);
shuffleSortTaskDeque.put(fileSorter);
} catch (InterruptedException e) {
logger.error(
"Sorter scheduler thread is interrupted means worker is shutting down.", e);
throw new IOException(
"Sort scheduler thread is interrupted means worker is shutting down.", e);
} catch (IOException e) {
logger.error("File sorter access DFS failed.", e);
throw new IOException("File sorter access DFS failed.", e);
}
}
}
if (fileSorting) {
long sortStartTime = System.currentTimeMillis();
while (!sorted.contains(fileId)) {
if (sorting.contains(fileId)) {
try {
Thread.sleep(50);
if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
logger.error("Sorting file {} timeout after {}ms", fileId, sortTimeout);
throw new IOException(
"Sort file " + diskFileInfo.getFilePath() + " timeout after " + sortTimeout);
}
} catch (InterruptedException e) {
logger.error(
"Sorter scheduler thread is interrupted means worker is shutting down.", e);
throw new IOException(
"Sorter scheduler thread is interrupted means worker is shutting down.", e);
}
} else {
logger.debug(
"Sorting shuffle file for {} {} failed.", shuffleKey, diskFileInfo.getFilePath());
throw new IOException(
"Sorting shuffle file for "
+ shuffleKey
+ " "
+ diskFileInfo.getFilePath()
+ " failed.");
}
}
}
return resolve(
shuffleKey,
fileId,
userIdentifier,
sortedFilePath,
indexFilePath,
startMapIndex,
endMapIndex);
}
}