in server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java [207:260]
public Storage selectStorage(ShuffleDataFlushEvent event) {
if (localStorages.size() == 1) {
if (event.getUnderStorage() == null) {
event.setUnderStorage(localStorages.get(0));
}
return localStorages.get(0);
}
String appId = event.getAppId();
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
LocalStorage storage =
sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, partitionId));
if (storage != null) {
if (storage.isCorrupted()) {
if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
LOG.error(
"LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost",
storage.getBasePath(),
event);
}
} else {
if (event.getUnderStorage() == null) {
event.setUnderStorage(storage);
}
return storage;
}
}
List<LocalStorage> candidates =
localStorages.stream()
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());
if (candidates.size() == 0) {
return null;
}
final LocalStorage selectedStorage =
candidates.get(
ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
return sortedPartitionsOfStorageMap.compute(
UnionKey.buildKey(appId, shuffleId, partitionId),
(key, localStorage) -> {
// If this is the first time to select storage or existing storage is corrupted,
// we should refresh the cache.
if (localStorage == null
|| localStorage.isCorrupted()
|| event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
return localStorage;
});
}