in server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java [179:226]
public Storage selectStorage(ShuffleDataFlushEvent event) {
String appId = event.getAppId();
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
LocalStorage storage =
partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId));
if (storage != null) {
if (storage.isCorrupted()) {
if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
LOG.error(
"LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost",
storage.getBasePath(),
event);
}
} else {
if (event.getUnderStorage() == null) {
event.setUnderStorage(storage);
}
return storage;
}
}
List<LocalStorage> candidates =
localStorages.stream()
.filter(x -> x.canWrite() && !x.isCorrupted())
.collect(Collectors.toList());
if (candidates.size() == 0) {
return null;
}
final LocalStorage selectedStorage =
candidates.get(
ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
return partitionsOfStorage.compute(
UnionKey.buildKey(appId, shuffleId, partitionId),
(key, localStorage) -> {
// If this is the first time to select storage or existing storage is corrupted,
// we should refresh the cache.
if (localStorage == null
|| localStorage.isCorrupted()
|| event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
return localStorage;
});
}